]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix IOError on WAL write doesn't propagate to write group follower
authorYi Wu <yiwu@fb.com>
Tue, 28 Nov 2017 19:40:40 +0000 (11:40 -0800)
committerYi Wu <yiwu@fb.com>
Tue, 28 Nov 2017 20:33:26 +0000 (12:33 -0800)
Summary:
This is a simpler version of #3097 by removing all unrelated changes.

Fixing the bug where concurrent writes may get Status::OK while it actually gets IOError on WAL write. This happens when multiple writes form a write batch group, and the leader get an IOError while writing to WAL. The leader failed to pass the error to followers in the group, and the followers end up returning Status::OK() while actually writing nothing. The bug only affect writes in a batch group. Future writes after the batch group will correctly return immediately with the IOError.
Closes https://github.com/facebook/rocksdb/pull/3201

Differential Revision: D6421644

Pulled By: yiwu-arbug

fbshipit-source-id: 1c2a455c5b73f6842423785eb8a9dbfbb191dc0e

HISTORY.md
db/db_impl_write.cc
db/db_write_test.cc
db/write_thread.cc

index 9156290e0c13448759c9fec66c05310d5d79afa3..1d9a666be87ba58cb4a09c4333f5c8c151260971 100644 (file)
@@ -1,4 +1,6 @@
 # Rocksdb Change Log
+* Fix IOError on WAL write doesn't propagate to write group follower
+
 ## 5.8.6 (11/20/2017)
 ### Bug Fixes
 * Fixed aligned_alloc issues with Windows.
index 8a11948f7e30736e533f921bf2c79aab09d10b77..2b06c7d710c49b63a1b364bffdab966d4b75896e 100644 (file)
@@ -319,7 +319,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
       versions_->SetLastSequence(last_sequence);
     }
     MemTableInsertStatusCheck(w.status);
-    write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+    write_thread_.ExitAsBatchGroupLeader(write_group, status);
   }
 
   if (status.ok()) {
@@ -543,7 +543,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
   if (!w.CallbackFailed()) {
     WriteCallbackStatusCheck(status);
   }
-  nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
+  nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, status);
   if (status.ok()) {
     status = w.FinalStatus();
   }
index 726f444fa16471e3be46edf2f49c698e77df90fc..e3e8ad829d7b856d3ef795e22ac82963e2800717 100644 (file)
@@ -3,12 +3,17 @@
 //  COPYING file in the root directory) and Apache 2.0 License
 //  (found in the LICENSE.Apache file in the root directory).
 
+#include <atomic>
 #include <memory>
 #include <thread>
 #include <vector>
 #include "db/db_test_util.h"
 #include "db/write_batch_internal.h"
+#include "db/write_thread.h"
+#include "port/port.h"
 #include "port/stack_trace.h"
+#include "util/fault_injection_test_env.h"
+#include "util/string_util.h"
 #include "util/sync_point.h"
 
 namespace rocksdb {
@@ -18,7 +23,9 @@ class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
  public:
   DBWriteTest() : DBTestBase("/db_write_test") {}
 
-  void Open() { DBTestBase::Reopen(GetOptions(GetParam())); }
+  Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
+
+  void Open() { DBTestBase::Reopen(GetOptions()); }
 };
 
 // Sequence number should be return through input write batch.
@@ -67,6 +74,47 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
   }
 }
 
+TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
+  constexpr int kNumThreads = 5;
+  std::unique_ptr<FaultInjectionTestEnv> mock_env(
+      new FaultInjectionTestEnv(Env::Default()));
+  Options options = GetOptions();
+  options.env = mock_env.get();
+  Reopen(options);
+  std::atomic<int> ready_count{0};
+  std::atomic<int> leader_count{0};
+  std::vector<port::Thread> threads;
+  mock_env->SetFilesystemActive(false);
+  // Wait until all threads linked to write threads, to make sure
+  // all threads join the same batch group.
+  SyncPoint::GetInstance()->SetCallBack(
+      "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
+        ready_count++;
+        auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
+        if (w->state == WriteThread::STATE_GROUP_LEADER) {
+          leader_count++;
+          while (ready_count < kNumThreads) {
+            // busy waiting
+          }
+        }
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.push_back(port::Thread(
+        [&](int index) {
+          // All threads should fail.
+          ASSERT_FALSE(Put("key" + ToString(index), "value").ok());
+        },
+        i));
+  }
+  for (int i = 0; i < kNumThreads; i++) {
+    threads[i].join();
+  }
+  ASSERT_EQ(1, leader_count);
+  // Close before mock_env destruct.
+  Close();
+}
+
 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
                         testing::Values(DBTestBase::kDefault,
                                         DBTestBase::kConcurrentWALWrites,
index 2d3b34602ccbb1d23c31a67a510630e3d15dd5c4..afe2f27978e7ce992e0ffd1227a2274d4e3f1b0a 100644 (file)
@@ -532,6 +532,11 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
   Writer* last_writer = write_group.last_writer;
   assert(leader->link_older == nullptr);
 
+  // Propagate memtable write error to the whole group.
+  if (status.ok() && !write_group.status.ok()) {
+    status = write_group.status;
+  }
+
   if (enable_pipelined_write_) {
     // Notify writers don't write to memtable to exit.
     for (Writer* w = last_writer; w != leader;) {