]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Reunite checkpoint and backup core logic
authorAndrew Kryczka <andrewkr@fb.com>
Mon, 24 Apr 2017 21:57:27 +0000 (14:57 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Mon, 24 Apr 2017 22:06:46 +0000 (15:06 -0700)
Summary:
These code paths forked when checkpoint was introduced by copy/pasting the core backup logic. Over time they diverged and bug fixes were sometimes applied to one but not the other (like fix to include all relevant WALs for 2PC), or it required extra effort to fix both (like fix to forge CURRENT file). This diff reunites the code paths by extracting the core logic into a function, CreateCustomCheckpoint(), that is customizable via callbacks to implement both checkpoint and backup.

Related changes:

- flush_before_backup is now forcibly enabled when 2PC is enabled
- Extracted CheckpointImpl class definition into a header file. This is so the function, CreateCustomCheckpoint(), can be called by internal rocksdb code but not exposed to users.
- Implemented more functions in DummyDB/DummyLogFile (in backupable_db_test.cc) that are used by CreateCustomCheckpoint().
Closes https://github.com/facebook/rocksdb/pull/1932

Differential Revision: D4622986

Pulled By: ajkr

fbshipit-source-id: 157723884236ee3999a682673b64f7457a7a0d87

CMakeLists.txt
TARGETS
include/rocksdb/utilities/backupable_db.h
src.mk
utilities/backupable/backupable_db.cc
utilities/backupable/backupable_db_test.cc
utilities/checkpoint/checkpoint.cc [deleted file]
utilities/checkpoint/checkpoint_impl.cc [new file with mode: 0644]
utilities/checkpoint/checkpoint_impl.h [new file with mode: 0644]

index c015454a9e575fe147b0bd38ec58dda367b1261b..b26b1a39eb61af65d6ac00e9fc56ffff71af1298 100644 (file)
@@ -446,7 +446,7 @@ set(SOURCES
         util/xxhash.cc
         utilities/backupable/backupable_db.cc
         utilities/blob_db/blob_db.cc
-        utilities/checkpoint/checkpoint.cc
+        utilities/checkpoint/checkpoint_impl.cc
         utilities/col_buf_decoder.cc
         utilities/col_buf_encoder.cc
         utilities/column_aware_encoding_util.cc
diff --git a/TARGETS b/TARGETS
index 728b023ef945d05beede0b5eb04c0d2862c426d0..715110ba54e0f7bc74452b6cb73e988105f438b7 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -195,7 +195,7 @@ cpp_library(
       "util/xxhash.cc",
       "utilities/backupable/backupable_db.cc",
       "utilities/blob_db/blob_db.cc",
-      "utilities/checkpoint/checkpoint.cc",
+      "utilities/checkpoint/checkpoint_impl.cc",
       "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
       "utilities/convenience/info_log_finder.cc",
       "utilities/date_tiered/date_tiered_db_impl.cc",
index d80601c033bf7ba122c3d8ef02240d6e12d3d204..071f3e480dcf46c695e54452e1ae0d9171dddc85 100644 (file)
@@ -256,12 +256,14 @@ class BackupEngine {
                      BackupEngine** backup_engine_ptr);
 
   // same as CreateNewBackup, but stores extra application metadata
+  // Flush will always trigger if 2PC is enabled.
   virtual Status CreateNewBackupWithMetadata(
       DB* db, const std::string& app_metadata, bool flush_before_backup = false,
       std::function<void()> progress_callback = []() {}) = 0;
 
   // Captures the state of the database in the latest backup
   // NOT a thread safe call
+  // Flush will always trigger if 2PC is enabled.
   virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false,
                                  std::function<void()> progress_callback =
                                      []() {}) {
diff --git a/src.mk b/src.mk
index 6f4303cc2f49a36538daae9d8ca1dab50ebcf635..39f4218d051990ab24248136d7cc332e10b53515 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -150,7 +150,7 @@ LIB_SOURCES =                                                   \
   util/xxhash.cc                                                \
   utilities/backupable/backupable_db.cc                         \
   utilities/blob_db/blob_db.cc                                  \
-  utilities/checkpoint/checkpoint.cc                            \
+  utilities/checkpoint/checkpoint_impl.cc                       \
   utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc    \
   utilities/convenience/info_log_finder.cc                      \
   utilities/date_tiered/date_tiered_db_impl.cc                  \
index 4e5aff20a22391fed78df6c55af9c8a3a7d1025f..c40c781796427fb021d943b56add2c8f24923210 100644 (file)
@@ -21,6 +21,7 @@
 #include "util/logging.h"
 #include "util/string_util.h"
 #include "util/sync_point.h"
+#include "utilities/checkpoint/checkpoint_impl.h"
 
 #ifndef __STDC_FORMAT_MACROS
 #define __STDC_FORMAT_MACROS
@@ -702,28 +703,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   if (app_metadata.size() > kMaxAppMetaSize) {
     return Status::InvalidArgument("App metadata too large");
   }
-  Status s;
-  std::vector<std::string> live_files;
-  VectorLogPtr live_wal_files;
-  uint64_t manifest_file_size = 0;
-  uint64_t sequence_number = db->GetLatestSequenceNumber();
-
-  s = db->DisableFileDeletions();
-  if (s.ok()) {
-    // this will return live_files prefixed with "/"
-    s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
-  }
-  // if we didn't flush before backup, we need to also get WAL files
-  if (s.ok() && !flush_before_backup && options_.backup_log_files) {
-    // returns file names prefixed with "/"
-    s = db->GetSortedWalFiles(live_wal_files);
-  }
-  if (!s.ok()) {
-    db->EnableFileDeletions(false);
-    return s;
-  }
-  TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles1");
-  TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles2");
 
   BackupID new_backup_id = latest_backup_id_ + 1;
 
@@ -735,7 +714,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   assert(ret.second == true);
   auto& new_backup = ret.first->second;
   new_backup->RecordTimestamp();
-  new_backup->SetSequenceNumber(sequence_number);
   new_backup->SetAppMetadata(app_metadata);
 
   auto start_backup = backup_env_-> NowMicros();
@@ -745,7 +723,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
                  new_backup_id);
 
   auto private_tmp_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id, true));
-  s = backup_env_->FileExists(private_tmp_dir);
+  Status s = backup_env_->FileExists(private_tmp_dir);
   if (s.ok()) {
     // maybe last backup failed and left partial state behind, clean it up
     s = GarbageCollect();
@@ -767,81 +745,53 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   // This is used to check whether a live files shares a dst_path with another
   // live file.
   std::unordered_set<std::string> live_dst_paths;
-  live_dst_paths.reserve(live_files.size() + live_wal_files.size());
-
-  // Pre-fetch sizes for data files
-  std::unordered_map<std::string, uint64_t> data_path_to_size;
-  if (s.ok()) {
-    s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size);
-  }
 
   std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
   // Add a CopyOrCreateWorkItem to the channel for each live file
-  std::string manifest_fname, current_fname;
-  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
-    uint64_t number;
-    FileType type;
-    bool ok = ParseFileName(live_files[i], &number, &type);
-    if (!ok) {
-      assert(false);
-      return Status::Corruption("Can't parse file name. This is very bad");
-    }
-    // we should only get sst, manifest and current files here
-    assert(type == kTableFile || type == kDescriptorFile ||
-           type == kCurrentFile || type == kOptionsFile);
-
-    if (type == kCurrentFile) {
-      // We will craft the current file manually to ensure it's consistent with
-      // the manifest number. This is necessary because current's file contents
-      // can change during backup.
-      current_fname = live_files[i];
-      continue;
-    } else if (type == kDescriptorFile) {
-      manifest_fname = live_files[i];
-    }
-
-    auto data_path_to_size_iter =
-        data_path_to_size.find(db->GetName() + live_files[i]);
-    uint64_t size_bytes = data_path_to_size_iter == data_path_to_size.end()
-                              ? port::kMaxUint64
-                              : data_path_to_size_iter->second;
-
-    // rules:
-    // * if it's kTableFile, then it's shared
-    // * if it's kDescriptorFile, limit the size to manifest_file_size
-    s = AddBackupFileWorkItem(
-        live_dst_paths, backup_items_to_finish, new_backup_id,
-        options_.share_table_files && type == kTableFile, db->GetName(),
-        live_files[i], rate_limiter, size_bytes,
-        (type == kDescriptorFile) ? manifest_file_size : 0,
-        options_.share_files_with_checksum && type == kTableFile,
-        progress_callback);
-  }
-  if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
-    // Write the current file with the manifest filename as its contents.
-    s = AddBackupFileWorkItem(
-        live_dst_paths, backup_items_to_finish, new_backup_id,
-        false /* shared */, "" /* src_dir */, CurrentFileName(""), rate_limiter,
-        manifest_fname.size(), 0 /* size_limit */, false /* shared_checksum */,
-        progress_callback, manifest_fname.substr(1) + "\n");
-  }
-  ROCKS_LOG_INFO(options_.info_log,
-                 "begin add wal files for backup -- %" ROCKSDB_PRIszt,
-                 live_wal_files.size());
-  // Add a CopyOrCreateWorkItem to the channel for each WAL file
-  for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
-    uint64_t size_bytes = live_wal_files[i]->SizeFileBytes();
-    if (live_wal_files[i]->Type() == kAliveLogFile) {
-      ROCKS_LOG_INFO(options_.info_log,
-                     "add wal file for backup %s -- %" PRIu64,
-                     live_wal_files[i]->PathName().c_str(), size_bytes);
-      // we only care about live log files
-      // copy the file into backup_dir/files/<new backup>/
-      s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish,
-                                new_backup_id, false, /* not shared */
-                                db->GetOptions().wal_dir,
-                                live_wal_files[i]->PathName(), rate_limiter,
-                                size_bytes, size_bytes);
+  db->DisableFileDeletions();
+  if (s.ok()) {
+    CheckpointImpl checkpoint(db);
+    uint64_t sequence_number = 0;
+    s = checkpoint.CreateCustomCheckpoint(
+        db->GetDBOptions(),
+        [&](const std::string& src_dirname, const std::string& fname,
+            FileType) {
+          // custom checkpoint will switch to calling copy_file_cb after it sees
+          // NotSupported returned from link_file_cb.
+          return Status::NotSupported();
+        } /* link_file_cb */,
+        [&](const std::string& src_dirname, const std::string& fname,
+            uint64_t size_limit_bytes, FileType type) {
+          if (type == kLogFile && !options_.backup_log_files) {
+            return Status::OK();
+          }
+          Log(options_.info_log, "add file for backup %s", fname.c_str());
+          uint64_t size_bytes = 0;
+          Status st;
+          if (type == kTableFile) {
+            st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
+          }
+          if (st.ok()) {
+            st = AddBackupFileWorkItem(
+                live_dst_paths, backup_items_to_finish, new_backup_id,
+                options_.share_table_files && type == kTableFile, src_dirname,
+                fname, rate_limiter, size_bytes, size_limit_bytes,
+                options_.share_files_with_checksum && type == kTableFile,
+                progress_callback);
+          }
+          return st;
+        } /* copy_file_cb */,
+        [&](const std::string& fname, const std::string& contents, FileType) {
+          Log(options_.info_log, "add file for backup %s", fname.c_str());
+          return AddBackupFileWorkItem(
+              live_dst_paths, backup_items_to_finish, new_backup_id,
+              false /* shared */, "" /* src_dir */, fname, rate_limiter,
+              contents.size(), 0 /* size_limit */, false /* shared_checksum */,
+              progress_callback, contents);
+        } /* create_file_cb */,
+        &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
+    if (s.ok()) {
+      new_backup->SetSequenceNumber(sequence_number);
     }
   }
   ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
index 507b6fbb6035acb66576e00be5cef05001598edc..6e6e38d3fb7e302dea4612db4e8fbc5e50b5e3f0 100644 (file)
@@ -61,6 +61,10 @@ class DummyDB : public StackableDB {
     return options_;
   }
 
+  virtual DBOptions GetDBOptions() const override {
+    return DBOptions(options_);
+  }
+
   virtual Status EnableFileDeletions(bool force) override {
     EXPECT_TRUE(!deletions_enabled_);
     deletions_enabled_ = true;
@@ -106,9 +110,9 @@ class DummyDB : public StackableDB {
     }
 
     virtual SequenceNumber StartSequence() const override {
-      // backupabledb should not need this method
-      EXPECT_TRUE(false);
-      return 0;
+      // this seqnum guarantees the dummy file will be included in the backup
+      // as long as it is alive.
+      return kMaxSequenceNumber;
     }
 
     virtual uint64_t SizeFileBytes() const override {
@@ -204,6 +208,14 @@ class TestEnv : public EnvWrapper {
     return EnvWrapper::DeleteFile(fname);
   }
 
+  virtual Status DeleteDir(const std::string& dirname) override {
+    MutexLock l(&mutex_);
+    if (fail_delete_files_) {
+      return Status::IOError();
+    }
+    return EnvWrapper::DeleteDir(dirname);
+  }
+
   void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
     MutexLock l(&mutex_);
     std::sort(should_have_written.begin(), should_have_written.end());
@@ -266,6 +278,19 @@ class TestEnv : public EnvWrapper {
     }
     return EnvWrapper::GetChildrenFileAttributes(dir, r);
   }
+  Status GetFileSize(const std::string& path, uint64_t* size_bytes) override {
+    if (filenames_for_mocked_attrs_.size() > 0) {
+      auto fname = path.substr(path.find_last_of('/'));
+      auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(),
+                                     filenames_for_mocked_attrs_.end(), fname);
+      if (filename_iter != filenames_for_mocked_attrs_.end()) {
+        *size_bytes = 10;
+        return Status::OK();
+      }
+      return Status::NotFound(fname);
+    }
+    return EnvWrapper::GetFileSize(path, size_bytes);
+  }
 
   void SetCreateDirIfMissingFailure(bool fail) {
     create_dir_if_missing_failure_ = fail;
@@ -1374,10 +1399,10 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
   FillDB(db_.get(), 0, 100);
 
   rocksdb::SyncPoint::GetInstance()->LoadDependency({
-      {"BackupEngineImpl::CreateNewBackup:SavedLiveFiles1",
+      {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
        "VersionSet::LogAndApply:WriteManifest"},
       {"VersionSet::LogAndApply:WriteManifestDone",
-       "BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"},
+       "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"},
   });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc
deleted file mode 100644 (file)
index fc9e16f..0000000
+++ /dev/null
@@ -1,297 +0,0 @@
-//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
-//
-// Copyright (c) 2012 Facebook.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef ROCKSDB_LITE
-
-#include "rocksdb/utilities/checkpoint.h"
-
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
-#include <algorithm>
-#include <string>
-#include "db/wal_manager.h"
-#include "port/port.h"
-#include "rocksdb/db.h"
-#include "rocksdb/env.h"
-#include "rocksdb/transaction_log.h"
-#include "util/file_util.h"
-#include "util/filename.h"
-#include "util/sync_point.h"
-
-namespace rocksdb {
-
-class CheckpointImpl : public Checkpoint {
- public:
-  // Creates a Checkpoint object to be used for creating openable snapshots
-  explicit CheckpointImpl(DB* db) : db_(db) {}
-
-  // Builds an openable snapshot of RocksDB on the same disk, which
-  // accepts an output directory on the same disk, and under the directory
-  // (1) hard-linked SST files pointing to existing live SST files
-  // SST files will be copied if output directory is on a different filesystem
-  // (2) a copied manifest files and other files
-  // The directory should not already exist and will be created by this API.
-  // The directory will be an absolute path
-  using Checkpoint::CreateCheckpoint;
-  virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
-                                  uint64_t log_size_for_flush) override;
-
- private:
-  DB* db_;
-};
-
-Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
-  *checkpoint_ptr = new CheckpointImpl(db);
-  return Status::OK();
-}
-
-Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir,
-                                    uint64_t log_size_for_flush) {
-  return Status::NotSupported("");
-}
-
-// Builds an openable snapshot of RocksDB
-Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
-                                        uint64_t log_size_for_flush) {
-  Status s;
-  std::vector<std::string> live_files;
-  uint64_t manifest_file_size = 0;
-  DBOptions db_options = db_->GetDBOptions();
-  uint64_t min_log_num = port::kMaxUint64;
-  uint64_t sequence_number = db_->GetLatestSequenceNumber();
-  bool same_fs = true;
-  VectorLogPtr live_wal_files;
-
-  s = db_->GetEnv()->FileExists(checkpoint_dir);
-  if (s.ok()) {
-    return Status::InvalidArgument("Directory exists");
-  } else if (!s.IsNotFound()) {
-    assert(s.IsIOError());
-    return s;
-  }
-
-  s = db_->DisableFileDeletions();
-  bool flush_memtable = true;
-  if (s.ok()) {
-    if (!db_options.allow_2pc) {
-      // If out standing log files are small, we skip the flush.
-      s = db_->GetSortedWalFiles(live_wal_files);
-
-      if (!s.ok()) {
-        db_->EnableFileDeletions(false);
-        return s;
-      }
-
-      // Don't flush column families if total log size is smaller than
-      // log_size_for_flush. We copy the log files instead.
-      // We may be able to cover 2PC case too.
-      uint64_t total_wal_size = 0;
-      for (auto& wal : live_wal_files) {
-        total_wal_size += wal->SizeFileBytes();
-      }
-      if (total_wal_size < log_size_for_flush) {
-        flush_memtable = false;
-      }
-      live_wal_files.clear();
-    }
-
-    // this will return live_files prefixed with "/"
-    s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
-
-    if (s.ok() && db_options.allow_2pc) {
-      // If 2PC is enabled, we need to get minimum log number after the flush.
-      // Need to refetch the live files to recapture the snapshot.
-      if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
-                               &min_log_num)) {
-        db_->EnableFileDeletions(false);
-        return Status::InvalidArgument(
-            "2PC enabled but cannot fine the min log number to keep.");
-      }
-      // We need to refetch live files with flush to handle this case:
-      // A previous 000001.log contains the prepare record of transaction tnx1.
-      // The current log file is 000002.log, and sequence_number points to this
-      // file.
-      // After calling GetLiveFiles(), 000003.log is created.
-      // Then tnx1 is committed. The commit record is written to 000003.log.
-      // Now we fetch min_log_num, which will be 3.
-      // Then only 000002.log and 000003.log will be copied, and 000001.log will
-      // be skipped. 000003.log contains commit message of tnx1, but we don't
-      // have respective prepare record for it.
-      // In order to avoid this situation, we need to force flush to make sure
-      // all transactions commited before getting min_log_num will be flushed
-      // to SST files.
-      // We cannot get min_log_num before calling the GetLiveFiles() for the
-      // first time, because if we do that, all the logs files will be included,
-      // far more than needed.
-      s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true);
-    }
-
-    TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
-    TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
-  }
-  // if we have more than one column family, we need to also get WAL files
-  if (s.ok()) {
-    s = db_->GetSortedWalFiles(live_wal_files);
-  }
-  if (!s.ok()) {
-    db_->EnableFileDeletions(false);
-    return s;
-  }
-
-  size_t wal_size = live_wal_files.size();
-  ROCKS_LOG_INFO(
-      db_options.info_log,
-      "Started the snapshot process -- creating snapshot in directory %s",
-      checkpoint_dir.c_str());
-
-  std::string full_private_path = checkpoint_dir + ".tmp";
-
-  // create snapshot directory
-  s = db_->GetEnv()->CreateDir(full_private_path);
-
-  // copy/hard link live_files
-  std::string manifest_fname, current_fname;
-  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
-    uint64_t number;
-    FileType type;
-    bool ok = ParseFileName(live_files[i], &number, &type);
-    if (!ok) {
-      s = Status::Corruption("Can't parse file name. This is very bad");
-      break;
-    }
-    // we should only get sst, options, manifest and current files here
-    assert(type == kTableFile || type == kDescriptorFile ||
-           type == kCurrentFile || type == kOptionsFile);
-    assert(live_files[i].size() > 0 && live_files[i][0] == '/');
-    if (type == kCurrentFile) {
-      // We will craft the current file manually to ensure it's consistent with
-      // the manifest number. This is necessary because current's file contents
-      // can change during checkpoint creation.
-      current_fname = live_files[i];
-      continue;
-    } else if (type == kDescriptorFile) {
-      manifest_fname = live_files[i];
-    }
-    std::string src_fname = live_files[i];
-
-    // rules:
-    // * if it's kTableFile, then it's shared
-    // * if it's kDescriptorFile, limit the size to manifest_file_size
-    // * always copy if cross-device link
-    if ((type == kTableFile) && same_fs) {
-      ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", src_fname.c_str());
-      s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
-                                  full_private_path + src_fname);
-      if (s.IsNotSupported()) {
-        same_fs = false;
-        s = Status::OK();
-      }
-    }
-    if ((type != kTableFile) || (!same_fs)) {
-      ROCKS_LOG_INFO(db_options.info_log, "Copying %s", src_fname.c_str());
-      s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
-                   full_private_path + src_fname,
-                   (type == kDescriptorFile) ? manifest_file_size : 0,
-                   db_options.use_fsync);
-    }
-  }
-  if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
-    s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
-                   manifest_fname.substr(1) + "\n");
-  }
-  ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
-                 live_wal_files.size());
-
-  // Link WAL files. Copy exact size of last one because it is the only one
-  // that has changes after the last flush.
-  for (size_t i = 0; s.ok() && i < wal_size; ++i) {
-    if ((live_wal_files[i]->Type() == kAliveLogFile) &&
-        (!flush_memtable ||
-         live_wal_files[i]->StartSequence() >= sequence_number ||
-         live_wal_files[i]->LogNumber() >= min_log_num)) {
-      if (i + 1 == wal_size) {
-        ROCKS_LOG_INFO(db_options.info_log, "Copying %s",
-                       live_wal_files[i]->PathName().c_str());
-        s = CopyFile(db_->GetEnv(),
-                     db_options.wal_dir + live_wal_files[i]->PathName(),
-                     full_private_path + live_wal_files[i]->PathName(),
-                     live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
-        break;
-      }
-      if (same_fs) {
-        // we only care about live log files
-        ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
-                       live_wal_files[i]->PathName().c_str());
-        s = db_->GetEnv()->LinkFile(
-            db_options.wal_dir + live_wal_files[i]->PathName(),
-            full_private_path + live_wal_files[i]->PathName());
-        if (s.IsNotSupported()) {
-          same_fs = false;
-          s = Status::OK();
-        }
-      }
-      if (!same_fs) {
-        ROCKS_LOG_INFO(db_options.info_log, "Copying %s",
-                       live_wal_files[i]->PathName().c_str());
-        s = CopyFile(db_->GetEnv(),
-                     db_options.wal_dir + live_wal_files[i]->PathName(),
-                     full_private_path + live_wal_files[i]->PathName(), 0,
-                     db_options.use_fsync);
-      }
-    }
-  }
-
-  // we copied all the files, enable file deletions
-  db_->EnableFileDeletions(false);
-
-  if (s.ok()) {
-    // move tmp private backup to real snapshot directory
-    s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
-  }
-  if (s.ok()) {
-    unique_ptr<Directory> checkpoint_directory;
-    db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
-    if (checkpoint_directory != nullptr) {
-      s = checkpoint_directory->Fsync();
-    }
-  }
-
-  if (!s.ok()) {
-    // clean all the files we might have created
-    ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
-                   s.ToString().c_str());
-    // we have to delete the dir and all its children
-    std::vector<std::string> subchildren;
-    db_->GetEnv()->GetChildren(full_private_path, &subchildren);
-    for (auto& subchild : subchildren) {
-      std::string subchild_path = full_private_path + "/" + subchild;
-      Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
-      ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s",
-                     subchild_path.c_str(), s1.ToString().c_str());
-    }
-    // finally delete the private dir
-    Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
-    ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s",
-                   full_private_path.c_str(), s1.ToString().c_str());
-    return s;
-  }
-
-  // here we know that we succeeded and installed the new snapshot
-  ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
-  ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
-                 sequence_number);
-
-  return s;
-}
-}  // namespace rocksdb
-
-#endif  // ROCKSDB_LITE
diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc
new file mode 100644 (file)
index 0000000..3242946
--- /dev/null
@@ -0,0 +1,303 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under the BSD-style license found in the
+//  LICENSE file in the root directory of this source tree. An additional grant
+//  of patent rights can be found in the PATENTS file in the same directory.
+//
+// Copyright (c) 2012 Facebook.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/checkpoint/checkpoint_impl.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <algorithm>
+#include <string>
+#include <vector>
+
+#include "db/wal_manager.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/transaction_log.h"
+#include "rocksdb/utilities/checkpoint.h"
+#include "util/file_util.h"
+#include "util/filename.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
+  *checkpoint_ptr = new CheckpointImpl(db);
+  return Status::OK();
+}
+
+Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir,
+                                    uint64_t log_size_for_flush) {
+  return Status::NotSupported("");
+}
+
+// Builds an openable snapshot of RocksDB
+Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
+                                        uint64_t log_size_for_flush) {
+  DBOptions db_options = db_->GetDBOptions();
+
+  Status s = db_->GetEnv()->FileExists(checkpoint_dir);
+  if (s.ok()) {
+    return Status::InvalidArgument("Directory exists");
+  } else if (!s.IsNotFound()) {
+    assert(s.IsIOError());
+    return s;
+  }
+
+  ROCKS_LOG_INFO(
+      db_options.info_log,
+      "Started the snapshot process -- creating snapshot in directory %s",
+      checkpoint_dir.c_str());
+  std::string full_private_path = checkpoint_dir + ".tmp";
+  // create snapshot directory
+  s = db_->GetEnv()->CreateDir(full_private_path);
+  uint64_t sequence_number = 0;
+  if (s.ok()) {
+    db_->DisableFileDeletions();
+    s = CreateCustomCheckpoint(
+        db_options,
+        [&](const std::string& src_dirname, const std::string& fname,
+            FileType) {
+          ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
+          return db_->GetEnv()->LinkFile(src_dirname + fname,
+                                         full_private_path + fname);
+        } /* link_file_cb */,
+        [&](const std::string& src_dirname, const std::string& fname,
+            uint64_t size_limit_bytes, FileType) {
+          ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
+          return CopyFile(db_->GetEnv(), src_dirname + fname,
+                          full_private_path + fname, size_limit_bytes,
+                          db_options.use_fsync);
+        } /* copy_file_cb */,
+        [&](const std::string& fname, const std::string& contents, FileType) {
+          ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
+          return CreateFile(db_->GetEnv(), full_private_path + fname, contents);
+        } /* create_file_cb */,
+        &sequence_number, log_size_for_flush);
+    // we copied all the files, enable file deletions
+    db_->EnableFileDeletions(false);
+  }
+
+  if (s.ok()) {
+    // move tmp private backup to real snapshot directory
+    s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
+  }
+  if (s.ok()) {
+    unique_ptr<Directory> checkpoint_directory;
+    db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
+    if (checkpoint_directory != nullptr) {
+      s = checkpoint_directory->Fsync();
+    }
+  }
+
+  if (s.ok()) {
+    // here we know that we succeeded and installed the new snapshot
+    ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
+    ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
+                   sequence_number);
+  } else {
+    // clean all the files we might have created
+    ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
+                   s.ToString().c_str());
+    // we have to delete the dir and all its children
+    std::vector<std::string> subchildren;
+    db_->GetEnv()->GetChildren(full_private_path, &subchildren);
+    for (auto& subchild : subchildren) {
+      std::string subchild_path = full_private_path + "/" + subchild;
+      Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
+      ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s",
+                     subchild_path.c_str(), s1.ToString().c_str());
+    }
+    // finally delete the private dir
+    Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
+    ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s",
+                   full_private_path.c_str(), s1.ToString().c_str());
+  }
+  return s;
+}
+
+Status CheckpointImpl::CreateCustomCheckpoint(
+    const DBOptions& db_options,
+    std::function<Status(const std::string& src_dirname,
+                         const std::string& src_fname, FileType type)>
+        link_file_cb,
+    std::function<Status(const std::string& src_dirname,
+                         const std::string& src_fname,
+                         uint64_t size_limit_bytes, FileType type)>
+        copy_file_cb,
+    std::function<Status(const std::string& fname, const std::string& contents,
+                         FileType type)>
+        create_file_cb,
+    uint64_t* sequence_number, uint64_t log_size_for_flush) {
+  Status s;
+  std::vector<std::string> live_files;
+  uint64_t manifest_file_size = 0;
+  uint64_t min_log_num = port::kMaxUint64;
+  *sequence_number = db_->GetLatestSequenceNumber();
+  bool same_fs = true;
+  VectorLogPtr live_wal_files;
+
+  bool flush_memtable = true;
+  if (s.ok()) {
+    if (!db_options.allow_2pc) {
+      if (log_size_for_flush == port::kMaxUint64) {
+        flush_memtable = false;
+      } else if (log_size_for_flush > 0) {
+        // If out standing log files are small, we skip the flush.
+        s = db_->GetSortedWalFiles(live_wal_files);
+
+        if (!s.ok()) {
+          return s;
+        }
+
+        // Don't flush column families if total log size is smaller than
+        // log_size_for_flush. We copy the log files instead.
+        // We may be able to cover 2PC case too.
+        uint64_t total_wal_size = 0;
+        for (auto& wal : live_wal_files) {
+          total_wal_size += wal->SizeFileBytes();
+        }
+        if (total_wal_size < log_size_for_flush) {
+          flush_memtable = false;
+        }
+        live_wal_files.clear();
+      }
+    }
+
+    // this will return live_files prefixed with "/"
+    s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
+
+    if (s.ok() && db_options.allow_2pc) {
+      // If 2PC is enabled, we need to get minimum log number after the flush.
+      // Need to refetch the live files to recapture the snapshot.
+      if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
+                               &min_log_num)) {
+        return Status::InvalidArgument(
+            "2PC enabled but cannot fine the min log number to keep.");
+      }
+      // We need to refetch live files with flush to handle this case:
+      // A previous 000001.log contains the prepare record of transaction tnx1.
+      // The current log file is 000002.log, and sequence_number points to this
+      // file.
+      // After calling GetLiveFiles(), 000003.log is created.
+      // Then tnx1 is committed. The commit record is written to 000003.log.
+      // Now we fetch min_log_num, which will be 3.
+      // Then only 000002.log and 000003.log will be copied, and 000001.log will
+      // be skipped. 000003.log contains commit message of tnx1, but we don't
+      // have respective prepare record for it.
+      // In order to avoid this situation, we need to force flush to make sure
+      // all transactions committed before getting min_log_num will be flushed
+      // to SST files.
+      // We cannot get min_log_num before calling the GetLiveFiles() for the
+      // first time, because if we do that, all the logs files will be included,
+      // far more than needed.
+      s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
+    }
+
+    TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
+    TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
+  }
+  // if we have more than one column family, we need to also get WAL files
+  if (s.ok()) {
+    s = db_->GetSortedWalFiles(live_wal_files);
+  }
+  if (!s.ok()) {
+    return s;
+  }
+
+  size_t wal_size = live_wal_files.size();
+
+  // copy/hard link live_files
+  std::string manifest_fname, current_fname;
+  for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
+    uint64_t number;
+    FileType type;
+    bool ok = ParseFileName(live_files[i], &number, &type);
+    if (!ok) {
+      s = Status::Corruption("Can't parse file name. This is very bad");
+      break;
+    }
+    // we should only get sst, options, manifest and current files here
+    assert(type == kTableFile || type == kDescriptorFile ||
+           type == kCurrentFile || type == kOptionsFile);
+    assert(live_files[i].size() > 0 && live_files[i][0] == '/');
+    if (type == kCurrentFile) {
+      // We will craft the current file manually to ensure it's consistent with
+      // the manifest number. This is necessary because current's file contents
+      // can change during checkpoint creation.
+      current_fname = live_files[i];
+      continue;
+    } else if (type == kDescriptorFile) {
+      manifest_fname = live_files[i];
+    }
+    std::string src_fname = live_files[i];
+
+    // rules:
+    // * if it's kTableFile, then it's shared
+    // * if it's kDescriptorFile, limit the size to manifest_file_size
+    // * always copy if cross-device link
+    if ((type == kTableFile) && same_fs) {
+      s = link_file_cb(db_->GetName(), src_fname, type);
+      if (s.IsNotSupported()) {
+        same_fs = false;
+        s = Status::OK();
+      }
+    }
+    if ((type != kTableFile) || (!same_fs)) {
+      s = copy_file_cb(db_->GetName(), src_fname,
+                       (type == kDescriptorFile) ? manifest_file_size : 0,
+                       type);
+    }
+  }
+  if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
+    create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
+                   kCurrentFile);
+  }
+  ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
+                 live_wal_files.size());
+
+  // Link WAL files. Copy exact size of last one because it is the only one
+  // that has changes after the last flush.
+  for (size_t i = 0; s.ok() && i < wal_size; ++i) {
+    if ((live_wal_files[i]->Type() == kAliveLogFile) &&
+        (!flush_memtable ||
+         live_wal_files[i]->StartSequence() >= *sequence_number ||
+         live_wal_files[i]->LogNumber() >= min_log_num)) {
+      if (i + 1 == wal_size) {
+        s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
+                         live_wal_files[i]->SizeFileBytes(), kLogFile);
+        break;
+      }
+      if (same_fs) {
+        // we only care about live log files
+        s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
+                         kLogFile);
+        if (s.IsNotSupported()) {
+          same_fs = false;
+          s = Status::OK();
+        }
+      }
+      if (!same_fs) {
+        s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
+                         kLogFile);
+      }
+    }
+  }
+
+  return s;
+}
+
+}  // namespace rocksdb
+
+#endif  // ROCKSDB_LITE
diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h
new file mode 100644 (file)
index 0000000..f364b9e
--- /dev/null
@@ -0,0 +1,55 @@
+//  Copyright (c) 2017-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under the BSD-style license found in the
+//  LICENSE file in the root directory of this source tree. An additional grant
+//  of patent rights can be found in the PATENTS file in the same directory.
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/utilities/checkpoint.h"
+
+#include <string>
+#include "rocksdb/db.h"
+#include "util/filename.h"
+
+namespace rocksdb {
+
+class CheckpointImpl : public Checkpoint {
+ public:
+  // Creates a Checkpoint object to be used for creating openable snapshots
+  explicit CheckpointImpl(DB* db) : db_(db) {}
+
+  // Builds an openable snapshot of RocksDB on the same disk, which
+  // accepts an output directory on the same disk, and under the directory
+  // (1) hard-linked SST files pointing to existing live SST files
+  // SST files will be copied if output directory is on a different filesystem
+  // (2) a copied manifest files and other files
+  // The directory should not already exist and will be created by this API.
+  // The directory will be an absolute path
+  using Checkpoint::CreateCheckpoint;
+  virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
+                                  uint64_t log_size_for_flush) override;
+
+  // Checkpoint logic can be customized by providing callbacks for link, copy,
+  // or create.
+  Status CreateCustomCheckpoint(
+      const DBOptions& db_options,
+      std::function<Status(const std::string& src_dirname,
+                           const std::string& fname, FileType type)>
+          link_file_cb,
+      std::function<Status(const std::string& src_dirname,
+                           const std::string& fname, uint64_t size_limit_bytes,
+                           FileType type)>
+          copy_file_cb,
+      std::function<Status(const std::string& fname,
+                           const std::string& contents, FileType type)>
+          create_file_cb,
+      uint64_t* sequence_number, uint64_t log_size_for_flush);
+
+ private:
+  DB* db_;
+};
+
+}  //  namespace rocksdb
+
+#endif  // ROCKSDB_LITE