]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Checkpoint dir options fix (#8572)
authorMerlin Mao <qzmao@fb.com>
Fri, 23 Jul 2021 18:11:25 +0000 (11:11 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Fri, 23 Jul 2021 18:13:01 +0000 (11:13 -0700)
Summary:
Originally the 2 options `db_log_dir` and `wal_dir` will be reused in a snapshot db since the options files are just copied. By default, if `wal_dir` was not set when a db was created, it is set to the db's dir. Therefore, the snapshot db will use the same WAL dir. If both the original db and the snapshot db write to or delete from the WAL dir, one may modify or delete files which belong to the other. The same applies to `db_log_dir` as well, but as info log files are not copied or linked, it is simpler for this option.

2 arguments are added to `Checkpoint::CreateCheckpoint()`, allowing to override these 2 options.

`wal_dir`:  If the function argument `wal_dir` is empty, or set to the original db location, or the checkpoint location, the snapshot's `wal_dir` option will be updated to the checkpoint location. Otherwise, the absolute path specified in the argument will be used. During checkpointing, live WAL files will be copied or linked the new location, instead of the current WAL dir specified in the original db.

`db_log_dir`: Same as `wal_dir`, but no files will be copied or linked.

A new unit test was added: `CheckpointTest.CheckpointWithOptionsDirsTest`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8572

Test Plan:
New unit test
```
checkpoint_test --gtest_filter="CheckpointTest.CheckpointWithOptionsDirsTest"
```

Output
```
Note: Google Test filter = CheckpointTest.CheckpointWithOptionsDirsTest
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from CheckpointTest
[ RUN      ] CheckpointTest.CheckpointWithOptionsDirsTest
[       OK ] CheckpointTest.CheckpointWithOptionsDirsTest (11712 ms)
[----------] 1 test from CheckpointTest (11712 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran. (11713 ms total)
[  PASSED  ] 1 test.
```
This test will fail without this patch. Just modify the code to remove the 2 arguments introduced in this patch in `CreateCheckpoint()`.

Reviewed By: zhichao-cao

Differential Revision: D29832761

Pulled By: autopear

fbshipit-source-id: e6a639b4d674380df82998c0839e79cab695fe29

include/rocksdb/utilities/checkpoint.h
utilities/checkpoint/checkpoint_impl.cc
utilities/checkpoint/checkpoint_impl.h
utilities/checkpoint/checkpoint_test.cc

index df2a744033e16f7ea62b0bb6a4a46c48aa72c764..f4f1813ce887ba1a956683b66c33863c62d20240 100644 (file)
@@ -40,9 +40,13 @@ class Checkpoint {
   // sequence_number_ptr: if it is not nullptr, the value it points to will be
   // set to the DB's sequence number. The default value of this parameter is
   // nullptr.
+  // db_log_dir / wal_dir: override db_log_dir or wal_dir option in the
+  // snapshot. If empty, checkpoint_dir will be used.
   virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
                                   uint64_t log_size_for_flush = 0,
-                                  uint64_t* sequence_number_ptr = nullptr);
+                                  uint64_t* sequence_number_ptr = nullptr,
+                                  const std::string& db_log_dir = "",
+                                  const std::string& wal_dir = "");
 
   // Exports all live SST files of a specified Column Family onto export_dir,
   // returning SST files information in metadata.
index f79b0f2ae1adcf9e4bb6cf4fa22431bc30c1b9c0..e7351654ae8d80f587bc726910265b4dc85a2f20 100644 (file)
 #include "db/wal_manager.h"
 #include "file/file_util.h"
 #include "file/filename.h"
+#include "options/options_parser.h"
 #include "port/port.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
 #include "rocksdb/metadata.h"
 #include "rocksdb/transaction_log.h"
 #include "rocksdb/utilities/checkpoint.h"
+#include "rocksdb/utilities/options_util.h"
 #include "test_util/sync_point.h"
 #include "util/cast_util.h"
 #include "util/file_checksum_helper.h"
@@ -39,7 +41,9 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
 
 Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
                                     uint64_t /*log_size_for_flush*/,
-                                    uint64_t* /*sequence_number_ptr*/) {
+                                    uint64_t* /*sequence_number_ptr*/,
+                                    const std::string& /*db_log_dir*/,
+                                    const std::string& /*wal_dir*/) {
   return Status::NotSupported("");
 }
 
@@ -76,7 +80,9 @@ Status Checkpoint::ExportColumnFamily(
 // Builds an openable snapshot of RocksDB
 Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
                                         uint64_t log_size_for_flush,
-                                        uint64_t* sequence_number_ptr) {
+                                        uint64_t* sequence_number_ptr,
+                                        const std::string& db_log_dir,
+                                        const std::string& wal_dir) {
   DBOptions db_options = db_->GetDBOptions();
 
   Status s = db_->GetEnv()->FileExists(checkpoint_dir);
@@ -101,15 +107,54 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
     return Status::InvalidArgument("invalid checkpoint directory name");
   }
 
-  std::string full_private_path =
-      checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
-  ROCKS_LOG_INFO(
-      db_options.info_log,
-      "Snapshot process -- using temporary directory %s",
-      full_private_path.c_str());
+  std::string parsed_checkpoint_dir =
+      checkpoint_dir.substr(0, final_nonslash_idx + 1);
+  std::string full_private_path = parsed_checkpoint_dir + ".tmp";
+  ROCKS_LOG_INFO(db_options.info_log,
+                 "Snapshot process -- using temporary directory %s",
+                 full_private_path.c_str());
   CleanStagingDirectory(full_private_path, db_options.info_log.get());
   // create snapshot directory
   s = db_->GetEnv()->CreateDir(full_private_path);
+
+  // Remove the last `/`s if needed
+  std::string parsed_log_dir =
+      db_log_dir.empty()
+          ? ""
+          : db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1);
+  std::string parsed_wal_dir =
+      wal_dir.empty() ? ""
+                      : wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1);
+
+  // Info log files are not copied or linked, just update the option value.
+  std::string value_log_dir = parsed_log_dir == db_->GetName() ||
+                                      parsed_log_dir == parsed_checkpoint_dir
+                                  ? ""
+                                  : parsed_log_dir;
+
+  // If the wal_dir is empty, or the same as the source db dir, update the
+  // option value to the checkpoint dir.
+  std::string value_wal_dir;  // Option value to override
+  std::string new_wal_dir;    // The target location to copy/link WAL files
+  if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() ||
+      parsed_wal_dir == parsed_checkpoint_dir) {
+    value_wal_dir = parsed_checkpoint_dir;
+    new_wal_dir = full_private_path;  // Copy to the temp dir
+  } else {
+    value_wal_dir = parsed_wal_dir;
+    std::string prefix = parsed_checkpoint_dir + "/";
+    // If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp
+    // dir; otherwise, create it directly.
+    new_wal_dir =
+        parsed_wal_dir.rfind(prefix, 0) == 0
+            ? full_private_path + "/" + parsed_wal_dir.substr(prefix.size())
+            : parsed_wal_dir;
+    s = db_->GetEnv()->FileExists(new_wal_dir);
+    if (s.IsNotFound()) {
+      s = db_->GetEnv()->CreateDir(new_wal_dir);
+    }
+  }
+
   uint64_t sequence_number = 0;
   if (s.ok()) {
     // enable file deletions
@@ -120,21 +165,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
       s = CreateCustomCheckpoint(
           db_options,
           [&](const std::string& src_dirname, const std::string& fname,
-              FileType) {
+              FileType type) {
             ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
                            fname.c_str());
-            return db_->GetFileSystem()->LinkFile(src_dirname + fname,
-                                                  full_private_path + fname,
-                                                  IOOptions(), nullptr);
+            // WAL file links may be created in another location.
+            return db_->GetFileSystem()->LinkFile(
+                src_dirname + fname,
+                (type == kWalFile ? new_wal_dir : full_private_path) + fname,
+                IOOptions(), nullptr);
           } /* link_file_cb */,
           [&](const std::string& src_dirname, const std::string& fname,
-              uint64_t size_limit_bytes, FileType,
+              uint64_t size_limit_bytes, FileType type,
               const std::string& /* checksum_func_name */,
               const std::string& /* checksum_val */) {
             ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
-            return CopyFile(db_->GetFileSystem(), src_dirname + fname,
-                            full_private_path + fname, size_limit_bytes,
-                            db_options.use_fsync);
+            if (type == kOptionsFile) {
+              // Modify and rewrite option files
+              return CopyOptionsFile(src_dirname + fname,
+                                     full_private_path + fname, value_log_dir,
+                                     value_wal_dir);
+            } else {
+              // Copy other files. WAL files may be copied to another location.
+              return Status(CopyFile(
+                  db_->GetFileSystem(), src_dirname + fname,
+                  (type == kWalFile ? new_wal_dir : full_private_path) + fname,
+                  size_limit_bytes, db_options.use_fsync));
+            }
           } /* copy_file_cb */,
           [&](const std::string& fname, const std::string& contents, FileType) {
             ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
@@ -154,11 +210,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
 
   if (s.ok()) {
     // move tmp private backup to real snapshot directory
-    s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
+    s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir);
   }
   if (s.ok()) {
     std::unique_ptr<Directory> checkpoint_directory;
-    s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
+    s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir,
+                                    &checkpoint_directory);
     if (s.ok() && checkpoint_directory != nullptr) {
       s = checkpoint_directory->Fsync();
     }
@@ -588,6 +645,37 @@ Status CheckpointImpl::ExportFilesInMetaData(
 
   return s;
 }
+
+Status CheckpointImpl::CopyOptionsFile(const std::string& src_file,
+                                       const std::string& target_file,
+                                       const std::string& db_log_dir,
+                                       const std::string& wal_dir) {
+  Status s;
+  DBOptions src_db_options;
+  std::vector<ColumnFamilyDescriptor> src_cf_descs;
+  s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options,
+                          &src_cf_descs);
+  if (!s.ok()) {
+    return s;
+  }
+
+  // Override these 2 options
+  src_db_options.db_log_dir = db_log_dir;
+  src_db_options.wal_dir = wal_dir;
+
+  std::vector<std::string> src_cf_names;
+  std::vector<ColumnFamilyOptions> src_cf_opts;
+  src_cf_names.reserve(src_cf_descs.size());
+  src_cf_opts.reserve(src_cf_descs.size());
+  for (ColumnFamilyDescriptor desc : src_cf_descs) {
+    src_cf_names.push_back(desc.name);
+    src_cf_opts.push_back(desc.options);
+  }
+
+  return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts,
+                               target_file, db_->GetFileSystem());
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 #endif  // ROCKSDB_LITE
index ff9b8f4556d1afaf124233f06bdc5ba8e81092a3..d4aa3a382be0474fd64244b1aa5fb45217ccd90d 100644 (file)
@@ -20,7 +20,9 @@ class CheckpointImpl : public Checkpoint {
 
   Status CreateCheckpoint(const std::string& checkpoint_dir,
                           uint64_t log_size_for_flush,
-                          uint64_t* sequence_number_ptr) override;
+                          uint64_t* sequence_number_ptr,
+                          const std::string& db_log_dir,
+                          const std::string& wal_dir) override;
 
   Status ExportColumnFamily(ColumnFamilyHandle* handle,
                             const std::string& export_dir,
@@ -57,6 +59,11 @@ class CheckpointImpl : public Checkpoint {
                            const std::string& fname)>
           copy_file_cb);
 
+  Status CopyOptionsFile(const std::string& src_file,
+                         const std::string& target_file,
+                         const std::string& db_log_dir,
+                         const std::string& wal_dir);
+
  private:
   DB* db_;
 };
index 82afe9fceaa169c78de9117e531d09c72ddbce2b..17ab9606c5c6c5d62657bcbd2ebede92b1106007 100644 (file)
 #ifndef OS_WIN
 #include <unistd.h>
 #endif
+#include <iomanip>
 #include <iostream>
+#include <sstream>
 #include <thread>
 #include <utility>
+#include <vector>
 
 #include "db/db_impl/db_impl.h"
 #include "file/file_util.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
+#include "rocksdb/convenience.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
+#include "rocksdb/utilities/options_util.h"
 #include "rocksdb/utilities/transaction_db.h"
 #include "test_util/sync_point.h"
 #include "test_util/testharness.h"
@@ -259,6 +264,12 @@ class CheckpointTest : public testing::Test {
     }
     return result;
   }
+
+  static std::string IntToFixedWidthString(size_t i, int len) {
+    std::stringstream ss;
+    ss << std::setw(len) << std::setfill('0') << i;
+    return ss.str();
+  }
 };
 
 TEST_F(CheckpointTest, GetSnapshotLink) {
@@ -902,6 +913,144 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) {
   delete snapshot_db;
 }
 
+TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) {
+  // If the checkpoint and the source db share the same wal_dir, files may be
+  // corrupted if both write to or delete from the same wal_dir. db_log_dir
+  // should also be updated during checkpointing, but it is less important since
+  // log files are not copied or linked to the checkpoint.
+
+  // 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25
+  // flushes
+  const int key_len = 8;
+  const int value_len = 1016;
+  const size_t num_keys = 100;
+  const size_t buffer_size = 4096;
+
+  std::string value(value_len, ' ');
+
+  std::vector<std::string> dirs1 = {"", "", "", ""};
+  std::vector<std::string> dirs2 = {"/logs", "/wal", "", ""};
+  std::vector<std::string> dirs3 = {"/logs", "/wal", "/logs", "/wal"};
+
+  for (auto dirs : {dirs1, dirs2, dirs3}) {
+    std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0];
+    std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1];
+    std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2];
+    std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3];
+
+    Options src_opts = CurrentOptions();
+    WriteOptions w_opts;
+    ReadOptions r_opts;
+    DB* snapshotDB;
+    Checkpoint* checkpoint;
+
+    src_opts = CurrentOptions();
+    delete db_;
+    db_ = nullptr;
+    ASSERT_OK(DestroyDB(dbname_, src_opts));
+
+    // Create a database
+    src_opts.create_if_missing = true;
+    src_opts.write_buffer_size = buffer_size;
+    src_opts.OptimizeUniversalStyleCompaction(buffer_size);
+    src_opts.db_log_dir = src_log_dir;
+    src_opts.wal_dir = src_wal_dir;
+
+    ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+
+    // Write to src db
+    for (size_t i = 1; i <= num_keys; i++) {
+      ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+    }
+
+    // Take a snapshot
+    ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+    ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr,
+                                           snap_log_dir, snap_wal_dir));
+
+    // Write to src db again
+    for (size_t i = num_keys + 1; i <= num_keys * 2; i++) {
+      ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+    }
+
+    std::string result;
+    std::string key1 = IntToFixedWidthString(num_keys, key_len);
+    std::string key2 = IntToFixedWidthString(num_keys * 2, key_len);
+    std::string key3 = IntToFixedWidthString(num_keys * 3, key_len);
+
+    ASSERT_OK(db_->Get(r_opts, key1, &result));
+    ASSERT_OK(db_->Get(r_opts, key2, &result));
+
+    // Open snapshot with its own options
+    DBOptions snap_opts;
+    std::vector<ColumnFamilyDescriptor> snap_cfs;
+    ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts,
+                                &snap_cfs));
+
+    ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir);
+    ASSERT_EQ(snap_opts.wal_dir,
+              snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir);
+
+    std::vector<ColumnFamilyHandle*> handles;
+    ASSERT_OK(
+        DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
+    for (ColumnFamilyHandle* handle : handles) {
+      delete handle;
+    }
+    handles.clear();
+
+    ASSERT_OK(snapshotDB->Get(r_opts, key1, &result));
+    ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
+
+    // Write to snapshot
+    for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) {
+      ASSERT_OK(
+          snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value));
+    }
+
+    ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
+    ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
+
+    // Close and reopen the snapshot
+    delete snapshotDB;
+    ASSERT_OK(
+        DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
+    for (ColumnFamilyHandle* handle : handles) {
+      delete handle;
+    }
+    handles.clear();
+    ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
+    ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
+
+    delete snapshotDB;
+
+    // Close and reopen the source db
+    delete db_;
+    src_opts.create_if_missing = false;
+    ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+    ASSERT_OK(db_->Get(r_opts, key2, &result));
+    ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
+    delete db_;
+
+    // Delete the snapshot
+    Options del_opts;
+    del_opts.db_log_dir = snap_opts.db_log_dir;
+    del_opts.wal_dir = snap_opts.wal_dir;
+    ASSERT_OK(DestroyDB(snapshot_name_, del_opts));
+
+    // Reopen the source db again
+    ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
+
+    delete db_;
+    db_ = nullptr;
+    ASSERT_OK(DestroyDB(dbname_, src_opts));
+
+    dbname_ = test::PerThreadDBPath(env_, "db_test");
+
+    delete checkpoint;
+  }
+}
+
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {