]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Revise LockWAL/UnlockWAL implementation (#11020)
authorYanqin Jin <yanqin@fb.com>
Wed, 14 Dec 2022 05:45:00 +0000 (21:45 -0800)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 14 Dec 2022 05:45:00 +0000 (21:45 -0800)
Summary:
RocksDB has two public APIs: `DB::LockWAL()`/`DB::UnlockWAL()`. The current implementation acquires and
releases the internal `DBImpl::log_write_mutex_`.

According to the comment on `DBImpl::log_write_mutex_`: https://github.com/facebook/rocksdb/blob/7.8.fb/db/db_impl/db_impl.h#L2287:L2288
> Note: to avoid dealock, if needed to acquire both log_write_mutex_ and mutex_, the order should be first mutex_ and then log_write_mutex_.

This puts limitations on how applications can use the `LockWAL()` API. After `LockWAL()` returns ok, then application
should not perform any operation that acquires `mutex_`. Currently, the use case of `LockWAL()` is MyRocks implementing
the MySQL storage engine handlerton `lock_hton_log` interface. The operation that MyRocks performs after `LockWAL()`
is `GetSortedWalFiless()` which not only acquires mutex_, but also `log_write_mutex_`.

There are two issues:
1. Applications using these two APIs may hang if one thread calls `GetSortedWalFiles()` after
calling `LockWAL()` because log_write_mutex is not recursive.
2. Two threads may dead lock due to lock order inversion.

To fix these issues, we can modify the implementation of LockWAL so that it does not keep
`log_write_mutex_` held until UnlockWAL. To achieve the goal of locking the WAL, we can
instead manually inject a write stall so that all future writes will be stopped.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11020

Test Plan: make check

Reviewed By: ajkr

Differential Revision: D41785203

Pulled By: riversand963

fbshipit-source-id: 5ccb7a9c6eb9a2c3fa80fd2c399cc2568b8f89ce

HISTORY.md
db/db_impl/db_impl.cc
db/db_impl/db_impl.h
db/db_impl/db_impl_write.cc
db/db_wal_test.cc
db/write_thread.cc
include/rocksdb/db.h
utilities/transactions/transaction_test.cc

index 31466650127b7ed9c7274335c3d58fb0b3ea738a..f695158d7fa801593edadaadb7d5b2aeb624be9d 100644 (file)
@@ -11,6 +11,7 @@
 * Fixed a bug caused by `DB::SyncWAL()` affecting `track_and_verify_wals_in_manifest`. Without the fix, application may see "open error: Corruption: Missing WAL with log number" while trying to open the db. The corruption is a false alarm but prevents DB open (#10892).
 * Fixed a BackupEngine bug in which RestoreDBFromLatestBackup would fail if the latest backup was deleted and there is another valid backup available.
 * Fix L0 file misorder corruption caused by ingesting files of overlapping seqnos with memtable entries' through introducing `epoch_number`. Before the fix, `force_consistency_checks=true` may catch the corruption before it's exposed to readers, in which case writes returning `Status::Corruption` would be expected. Also replace the previous incomplete fix (#5958) to the same corruption with this new and more complete fix.
+* Fixed a bug in LockWAL() leading to re-locking mutex (#11020).
 
 ## 7.9.0 (11/21/2022)
 ### Performance Improvements
index 657d2870f4031e7c07b4a4a3e42036d076d24ada..411503a6fe17055a5abbee146ebfdc4afe712595 100644 (file)
@@ -1570,21 +1570,31 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
 }
 
 Status DBImpl::LockWAL() {
-  log_write_mutex_.Lock();
-  auto cur_log_writer = logs_.back().writer;
-  IOStatus status = cur_log_writer->WriteBuffer();
-  if (!status.ok()) {
-    ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
-                    status.ToString().c_str());
-    // In case there is a fs error we should set it globally to prevent the
-    // future writes
-    WriteStatusCheck(status);
+  {
+    InstrumentedMutexLock lock(&mutex_);
+    WriteThread::Writer w;
+    write_thread_.EnterUnbatched(&w, &mutex_);
+    WriteThread::Writer nonmem_w;
+    if (two_write_queues_) {
+      nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
+    }
+
+    lock_wal_write_token_ = write_controller_.GetStopToken();
+
+    if (two_write_queues_) {
+      nonmem_write_thread_.ExitUnbatched(&nonmem_w);
+    }
+    write_thread_.ExitUnbatched(&w);
   }
-  return static_cast<Status>(status);
+  return FlushWAL(/*sync=*/false);
 }
 
 Status DBImpl::UnlockWAL() {
-  log_write_mutex_.Unlock();
+  {
+    InstrumentedMutexLock lock(&mutex_);
+    lock_wal_write_token_.reset();
+  }
+  bg_cv_.SignalAll();
   return Status::OK();
 }
 
index 920c3d3f836fd26bd0e236b5016949fb852b99e7..0eebef77483a55e482b16a49146b0e2ef5fb8854 100644 (file)
@@ -2680,6 +2680,10 @@ class DBImpl : public DB {
   // seqno_time_mapping_ stores the sequence number to time mapping, it's not
   // thread safe, both read and write need db mutex hold.
   SeqnoToTimeMapping seqno_time_mapping_;
+
+  // stop write token that is acquired when LockWal() is called. Destructed
+  // when UnlockWal() is called.
+  std::unique_ptr<WriteControllerToken> lock_wal_write_token_;
 };
 
 class GetWithTimestampReadCallback : public ReadCallback {
index a597c168dcfc8221fd025a33a2dff4824a4140ad..cbeab046fd0ffa42572db09cafd6f9765aa98353 100644 (file)
@@ -924,6 +924,15 @@ Status DBImpl::WriteImplWALOnly(
       write_thread->ExitAsBatchGroupLeader(write_group, status);
       return status;
     }
+  } else {
+    InstrumentedMutexLock lock(&mutex_);
+    Status status = DelayWrite(/*num_bytes=*/0ull, write_options);
+    if (!status.ok()) {
+      WriteThread::WriteGroup write_group;
+      write_thread->EnterAsBatchGroupLeader(&w, &write_group);
+      write_thread->ExitAsBatchGroupLeader(write_group, status);
+      return status;
+    }
   }
 
   WriteThread::WriteGroup write_group;
@@ -1762,6 +1771,7 @@ uint64_t DBImpl::GetMaxTotalWalSize() const {
 // REQUIRES: this thread is currently at the front of the writer queue
 Status DBImpl::DelayWrite(uint64_t num_bytes,
                           const WriteOptions& write_options) {
+  mutex_.AssertHeld();
   uint64_t time_delayed = 0;
   bool delayed = false;
   {
index 2bdfaada5e5dec93363c7f3ae51abebd31cec7b7..99d0b3c4c8d9fae37a2eddae7747db563e3a5c92 100644 (file)
@@ -610,6 +610,52 @@ TEST_F(DBWALTest, WALWithChecksumHandoff) {
 #endif  // ROCKSDB_ASSERT_STATUS_CHECKED
 }
 
+#ifndef ROCKSDB_LITE
+TEST_F(DBWALTest, LockWal) {
+  do {
+    Options options = CurrentOptions();
+    options.create_if_missing = true;
+    DestroyAndReopen(options);
+    SyncPoint::GetInstance()->DisableProcessing();
+    SyncPoint::GetInstance()->LoadDependency(
+        {{"DBWALTest::LockWal:AfterGetSortedWal",
+          "DBWALTest::LockWal:BeforeFlush:1"}});
+    SyncPoint::GetInstance()->EnableProcessing();
+
+    ASSERT_OK(Put("foo", "v"));
+    ASSERT_OK(Put("bar", "v"));
+    port::Thread worker([&]() {
+      TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1");
+      Status tmp_s = db_->Flush(FlushOptions());
+      ASSERT_OK(tmp_s);
+    });
+
+    ASSERT_OK(db_->LockWAL());
+    // Verify writes are stopped
+    WriteOptions wopts;
+    wopts.no_slowdown = true;
+    Status s = db_->Put(wopts, "foo", "dontcare");
+    ASSERT_TRUE(s.IsIncomplete());
+    {
+      VectorLogPtr wals;
+      ASSERT_OK(db_->GetSortedWalFiles(wals));
+      ASSERT_FALSE(wals.empty());
+    }
+    TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal");
+    FlushOptions flush_opts;
+    flush_opts.wait = false;
+    s = db_->Flush(flush_opts);
+    ASSERT_TRUE(s.IsTryAgain());
+    ASSERT_OK(db_->UnlockWAL());
+    ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
+
+    worker.join();
+
+    SyncPoint::GetInstance()->DisableProcessing();
+  } while (ChangeWalOptions());
+}
+#endif  //! ROCKSDB_LITE
+
 class DBRecoveryTestBlobError
     : public DBWALTest,
       public testing::WithParamInterface<std::string> {
index cc8645f373119bf60c893377f4e71cb95260e07a..de1744cf0489ee6c1879bcc9cc1183a0bdf436e5 100644 (file)
@@ -360,8 +360,11 @@ void WriteThread::EndWriteStall() {
   // Unlink write_stall_dummy_ from the write queue. This will unblock
   // pending write threads to enqueue themselves
   assert(newest_writer_.load(std::memory_order_relaxed) == &write_stall_dummy_);
-  assert(write_stall_dummy_.link_older != nullptr);
-  write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
+  // write_stall_dummy_.link_older can be nullptr only if LockWAL() has been
+  // called.
+  if (write_stall_dummy_.link_older) {
+    write_stall_dummy_.link_older->link_newer = write_stall_dummy_.link_newer;
+  }
   newest_writer_.exchange(write_stall_dummy_.link_older);
 
   // Wake up writers
index 26c07c19f0c823e35d639340c32dee71bc39bea5..cad3d1c3c81ae96a5f83e8f6fa8e0690bbe4447c 100644 (file)
@@ -1445,11 +1445,17 @@ class DB {
   virtual Status SyncWAL() = 0;
 
   // Lock the WAL. Also flushes the WAL after locking.
+  // After this method returns ok, writes to the database will be stopped until
+  // UnlockWAL() is called.
+  // This method may internally acquire and release DB mutex and the WAL write
+  // mutex, but after it returns, neither mutex is held by caller.
   virtual Status LockWAL() {
     return Status::NotSupported("LockWAL not implemented");
   }
 
   // Unlock the WAL.
+  // The write stop on the database will be cleared.
+  // This method may internally acquire and release DB mutex.
   virtual Status UnlockWAL() {
     return Status::NotSupported("UnlockWAL not implemented");
   }
index caf1566b911ba07287fec48d547b23d90aaa0dec..e035e1f889022e69c38fa45e30f157183675f956 100644 (file)
@@ -6530,6 +6530,65 @@ TEST_P(TransactionTest, WriteWithBulkCreatedColumnFamilies) {
   cf_handles.clear();
 }
 
+TEST_P(TransactionTest, LockWal) {
+  const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
+  if (TxnDBWritePolicy::WRITE_COMMITTED != write_policy) {
+    ROCKSDB_GTEST_BYPASS("Test only write-committed for now");
+    return;
+  }
+  ASSERT_OK(ReOpen());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"TransactionTest::LockWal:AfterLockWal",
+        "TransactionTest::LockWal:BeforePrepareTxn2"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  std::unique_ptr<Transaction> txn0;
+  WriteOptions wopts;
+  wopts.no_slowdown = true;
+  txn0.reset(db->BeginTransaction(wopts, TransactionOptions()));
+  ASSERT_OK(txn0->SetName("txn0"));
+  ASSERT_OK(txn0->Put("foo", "v0"));
+
+  std::unique_ptr<Transaction> txn1;
+  txn1.reset(db->BeginTransaction(wopts, TransactionOptions()));
+  ASSERT_OK(txn1->SetName("txn1"));
+  ASSERT_OK(txn1->Put("dummy", "v0"));
+  ASSERT_OK(txn1->Prepare());
+
+  std::unique_ptr<Transaction> txn2;
+  port::Thread worker([&]() {
+    txn2.reset(db->BeginTransaction(WriteOptions(), TransactionOptions()));
+    ASSERT_OK(txn2->SetName("txn2"));
+    ASSERT_OK(txn2->Put("bar", "v0"));
+    TEST_SYNC_POINT("TransactionTest::LockWal:BeforePrepareTxn2");
+    ASSERT_OK(txn2->Prepare());
+    ASSERT_OK(txn2->Commit());
+  });
+  ASSERT_OK(db->LockWAL());
+  // txn0 cannot prepare
+  Status s = txn0->Prepare();
+  ASSERT_TRUE(s.IsIncomplete());
+  // txn1 cannot commit
+  s = txn1->Commit();
+  ASSERT_TRUE(s.IsIncomplete());
+
+  TEST_SYNC_POINT("TransactionTest::LockWal:AfterLockWal");
+
+  ASSERT_OK(db->UnlockWAL());
+  txn0.reset();
+
+  txn0.reset(db->BeginTransaction(wopts, TransactionOptions()));
+  ASSERT_OK(txn0->SetName("txn0_1"));
+  ASSERT_OK(txn0->Put("foo", "v1"));
+  ASSERT_OK(txn0->Prepare());
+  ASSERT_OK(txn0->Commit());
+  worker.join();
+
+  SyncPoint::GetInstance()->DisableProcessing();
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {