]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Replace Status with IOStatus in the backupable_db (#8820)
authorZhichao Cao <zhichao@fb.com>
Wed, 15 Sep 2021 22:08:39 +0000 (15:08 -0700)
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>
Wed, 15 Sep 2021 22:09:48 +0000 (15:09 -0700)
Summary:
In order to populate the IOStatus up to the higher level, replace some of the Status to IOStatus.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8820

Test Plan: make check

Reviewed By: pdillinger

Differential Revision: D30967215

Pulled By: zhichao-cao

fbshipit-source-id: ccf9d5cfbd9d3de047c464aaa85f9fa43b474903

file/line_file_reader.cc
file/line_file_reader.h
file/sequence_file_reader.cc
file/sequence_file_reader.h
include/rocksdb/utilities/backup_engine.h
utilities/backupable/backupable_db.cc

index 678abc4c6bd17ad4c5fef5c92d145d65685c2596..af00d77805fbd67635b86574ca9b005adac5d18e 100644 (file)
 
 namespace ROCKSDB_NAMESPACE {
 
-Status LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
-                              const std::string& fname,
-                              const FileOptions& file_opts,
-                              std::unique_ptr<LineFileReader>* reader,
-                              IODebugContext* dbg) {
+IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
+                                const std::string& fname,
+                                const FileOptions& file_opts,
+                                std::unique_ptr<LineFileReader>* reader,
+                                IODebugContext* dbg) {
   std::unique_ptr<FSSequentialFile> file;
-  Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
-  if (s.ok()) {
+  IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
+  if (io_s.ok()) {
     reader->reset(new LineFileReader(std::move(file), fname));
   }
-  return s;
+  return io_s;
 }
 
 bool LineFileReader::ReadLine(std::string* out) {
   assert(out);
-  if (!status_.ok()) {
+  if (!io_status_.ok()) {
     // Status should be checked (or permit unchecked) any time we return false.
-    status_.MustCheck();
+    io_status_.MustCheck();
     return false;
   }
   out->clear();
@@ -44,16 +44,16 @@ bool LineFileReader::ReadLine(std::string* out) {
       return true;
     }
     if (at_eof_) {
-      status_.MustCheck();
+      io_status_.MustCheck();
       return false;
     }
     // else flush and reload buffer
     out->append(buf_begin_, buf_end_ - buf_begin_);
     Slice result;
-    status_ = sfr_.Read(buf_.size(), &result, buf_.data());
+    io_status_ = sfr_.Read(buf_.size(), &result, buf_.data());
     IOSTATS_ADD(bytes_read, result.size());
-    if (!status_.ok()) {
-      status_.MustCheck();
+    if (!io_status_.ok()) {
+      io_status_.MustCheck();
       return false;
     }
     if (result.size() != buf_.size()) {
index 48d79f327c0e03a039ed794f26f461e4b4caba34..4b4a9d56483eac58ce1debcde9608503d13921e6 100644 (file)
@@ -17,7 +17,7 @@ class LineFileReader {
  private:
   std::array<char, 8192> buf_;
   SequentialFileReader sfr_;
-  Status status_;
+  IOStatus io_status_;
   const char* buf_begin_ = buf_.data();
   const char* buf_end_ = buf_.data();
   size_t line_number_ = 0;
@@ -29,10 +29,10 @@ class LineFileReader {
   explicit LineFileReader(Args&&... args)
       : sfr_(std::forward<Args&&>(args)...) {}
 
-  static Status Create(const std::shared_ptr<FileSystem>& fs,
-                       const std::string& fname, const FileOptions& file_opts,
-                       std::unique_ptr<LineFileReader>* reader,
-                       IODebugContext* dbg);
+  static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
+                         const std::string& fname, const FileOptions& file_opts,
+                         std::unique_ptr<LineFileReader>* reader,
+                         IODebugContext* dbg);
 
   LineFileReader(const LineFileReader&) = delete;
   LineFileReader& operator=(const LineFileReader&) = delete;
@@ -53,7 +53,7 @@ class LineFileReader {
   // Returns any error encountered during read. The error is considered
   // permanent and no retry or recovery is attempted with the same
   // LineFileReader.
-  const Status& GetStatus() const { return status_; }
+  const IOStatus& GetStatus() const { return io_status_; }
 };
 
 }  // namespace ROCKSDB_NAMESPACE
index 3a87b6d102f0dd981f1a76cec92d3f51c42ad652..70a0e0f07718b6c98fdabaf77672505c561f607e 100644 (file)
 #include "util/rate_limiter.h"
 
 namespace ROCKSDB_NAMESPACE {
-Status SequentialFileReader::Create(
+IOStatus SequentialFileReader::Create(
     const std::shared_ptr<FileSystem>& fs, const std::string& fname,
     const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
     IODebugContext* dbg) {
   std::unique_ptr<FSSequentialFile> file;
-  Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
-  if (s.ok()) {
+  IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
+  if (io_s.ok()) {
     reader->reset(new SequentialFileReader(std::move(file), fname));
   }
-  return s;
+  return io_s;
 }
 
-Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
-  Status s;
+IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
+  IOStatus io_s;
   if (use_direct_io()) {
 #ifndef ROCKSDB_LITE
     size_t offset = offset_.fetch_add(n);
@@ -48,9 +48,9 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
     buf.Alignment(alignment);
     buf.AllocateNewBuffer(size);
     Slice tmp;
-    s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
-                              buf.BufferStart(), nullptr);
-    if (s.ok() && offset_advance < tmp.size()) {
+    io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
+                                 buf.BufferStart(), nullptr);
+    if (io_s.ok() && offset_advance < tmp.size()) {
       buf.Size(tmp.size());
       r = buf.Read(scratch, offset_advance,
                    std::min(tmp.size() - offset_advance, n));
@@ -58,17 +58,17 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
     *result = Slice(scratch, r);
 #endif  // !ROCKSDB_LITE
   } else {
-    s = file_->Read(n, IOOptions(), result, scratch, nullptr);
+    io_s = file_->Read(n, IOOptions(), result, scratch, nullptr);
   }
   IOSTATS_ADD(bytes_read, result->size());
-  return s;
+  return io_s;
 }
 
-Status SequentialFileReader::Skip(uint64_t n) {
+IOStatus SequentialFileReader::Skip(uint64_t n) {
 #ifndef ROCKSDB_LITE
   if (use_direct_io()) {
     offset_ += static_cast<size_t>(n);
-    return Status::OK();
+    return IOStatus::OK();
   }
 #endif  // !ROCKSDB_LITE
   return file_->Skip(n);
index ea315f853e2eca9c881fb1178031a65e3d8cd308..72fe37f7df9def96eb7632be9525902e3b2370de 100644 (file)
@@ -41,17 +41,17 @@ class SequentialFileReader {
       : file_name_(_file_name),
         file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
               io_tracer, _file_name) {}
-  static Status Create(const std::shared_ptr<FileSystem>& fs,
-                       const std::string& fname, const FileOptions& file_opts,
-                       std::unique_ptr<SequentialFileReader>* reader,
-                       IODebugContext* dbg);
+  static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
+                         const std::string& fname, const FileOptions& file_opts,
+                         std::unique_ptr<SequentialFileReader>* reader,
+                         IODebugContext* dbg);
 
   SequentialFileReader(const SequentialFileReader&) = delete;
   SequentialFileReader& operator=(const SequentialFileReader&) = delete;
 
-  Status Read(size_t n, Slice* result, char* scratch);
+  IOStatus Read(size_t n, Slice* result, char* scratch);
 
-  Status Skip(uint64_t n);
+  IOStatus Skip(uint64_t n);
 
   FSSequentialFile* file() { return file_.get(); }
 
index f508d562c247f69396deaebf9b27e046a24a78a6..c8ad84107b705fb2dcc90a3dc1d78808f9b5230b 100644 (file)
@@ -17,6 +17,7 @@
 #include <vector>
 
 #include "rocksdb/env.h"
+#include "rocksdb/io_status.h"
 #include "rocksdb/options.h"
 #include "rocksdb/status.h"
 
@@ -406,25 +407,25 @@ class BackupEngineReadOnlyBase {
       std::vector<BackupID>* corrupt_backup_ids) const = 0;
 
   // Restore to specified db_dir and wal_dir from backup_id.
-  virtual Status RestoreDBFromBackup(const RestoreOptions& options,
-                                     BackupID backup_id,
-                                     const std::string& db_dir,
-                                     const std::string& wal_dir) const = 0;
+  virtual IOStatus RestoreDBFromBackup(const RestoreOptions& options,
+                                       BackupID backup_id,
+                                       const std::string& db_dir,
+                                       const std::string& wal_dir) const = 0;
 
   // keep for backward compatibility.
-  virtual Status RestoreDBFromBackup(
+  virtual IOStatus RestoreDBFromBackup(
       BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
       const RestoreOptions& options = RestoreOptions()) const {
     return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
   }
 
   // Like RestoreDBFromBackup but restores from latest non-corrupt backup_id
-  virtual Status RestoreDBFromLatestBackup(
+  virtual IOStatus RestoreDBFromLatestBackup(
       const RestoreOptions& options, const std::string& db_dir,
       const std::string& wal_dir) const = 0;
 
   // keep for backward compatibility.
-  virtual Status RestoreDBFromLatestBackup(
+  virtual IOStatus RestoreDBFromLatestBackup(
       const std::string& db_dir, const std::string& wal_dir,
       const RestoreOptions& options = RestoreOptions()) const {
     return RestoreDBFromLatestBackup(options, db_dir, wal_dir);
@@ -445,8 +446,8 @@ class BackupEngineReadOnlyBase {
   // their sizes (and checksums) when the BackupEngine was opened.
   //
   // Returns Status::OK() if all checks are good
-  virtual Status VerifyBackup(BackupID backup_id,
-                              bool verify_with_checksum = false) const = 0;
+  virtual IOStatus VerifyBackup(BackupID backup_id,
+                                bool verify_with_checksum = false) const = 0;
 };
 
 // Append-only functions of a BackupEngine. See BackupEngine comment for
@@ -457,12 +458,12 @@ class BackupEngineAppendOnlyBase {
   virtual ~BackupEngineAppendOnlyBase() {}
 
   // same as CreateNewBackup, but stores extra application metadata.
-  virtual Status CreateNewBackupWithMetadata(
+  virtual IOStatus CreateNewBackupWithMetadata(
       const CreateBackupOptions& options, DB* db,
       const std::string& app_metadata, BackupID* new_backup_id = nullptr) = 0;
 
   // keep here for backward compatibility.
-  virtual Status CreateNewBackupWithMetadata(
+  virtual IOStatus CreateNewBackupWithMetadata(
       DB* db, const std::string& app_metadata, bool flush_before_backup = false,
       std::function<void()> progress_callback = []() {}) {
     CreateBackupOptions options;
@@ -514,7 +515,7 @@ class BackupEngineAppendOnlyBase {
   // with Append or Write operations in another BackupEngine on the same
   // backup_dir, because temporary files will be treated as obsolete and
   // deleted.
-  virtual Status GarbageCollect() = 0;
+  virtual IOStatus GarbageCollect() = 0;
 };
 
 // A backup engine for organizing and managing backups.
@@ -585,13 +586,13 @@ class BackupEngine : public BackupEngineReadOnlyBase,
 
   // Deletes old backups, keeping latest num_backups_to_keep alive.
   // See also DeleteBackup.
-  virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
+  virtual IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
 
   // Deletes a specific backup. If this operation (or PurgeOldBackups)
   // is not completed due to crash, power failure, etc. the state
   // will be cleaned up the next time you call DeleteBackup,
   // PurgeOldBackups, or GarbageCollect.
-  virtual Status DeleteBackup(BackupID backup_id) = 0;
+  virtual IOStatus DeleteBackup(BackupID backup_id) = 0;
 };
 
 // A variant of BackupEngine that only allows "Read" operations. See
index 9f045062e589fb9f8f571b57a54cdc8addb80968..d1f8fd85e4595bec2a65fd62164339cff14059f4 100644 (file)
@@ -128,17 +128,17 @@ class BackupEngineImpl {
                    bool read_only = false);
   ~BackupEngineImpl();
 
-  Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
-                                     const std::string& app_metadata,
-                                     BackupID* new_backup_id_ptr);
+  IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
+                                       DB* db, const std::string& app_metadata,
+                                       BackupID* new_backup_id_ptr);
 
-  Status PurgeOldBackups(uint32_t num_backups_to_keep);
+  IOStatus PurgeOldBackups(uint32_t num_backups_to_keep);
 
-  Status DeleteBackup(BackupID backup_id);
+  IOStatus DeleteBackup(BackupID backup_id);
 
   void StopBackup() { stop_backup_.store(true, std::memory_order_release); }
 
-  Status GarbageCollect();
+  IOStatus GarbageCollect();
 
   // The returned BackupInfos are in chronological order, which means the
   // latest backup comes last.
@@ -150,21 +150,21 @@ class BackupEngineImpl {
 
   void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const;
 
-  Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
-                             const std::string& db_dir,
-                             const std::string& wal_dir) const;
+  IOStatus RestoreDBFromBackup(const RestoreOptions& options,
+                               BackupID backup_id, const std::string& db_dir,
+                               const std::string& wal_dir) const;
 
-  Status RestoreDBFromLatestBackup(const RestoreOptions& options,
-                                   const std::string& db_dir,
-                                   const std::string& wal_dir) const {
+  IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options,
+                                     const std::string& db_dir,
+                                     const std::string& wal_dir) const {
     // Note: don't read latest_valid_backup_id_ outside of lock
     return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
   }
 
-  Status VerifyBackup(BackupID backup_id,
-                      bool verify_with_checksum = false) const;
+  IOStatus VerifyBackup(BackupID backup_id,
+                        bool verify_with_checksum = false) const;
 
-  Status Initialize();
+  IOStatus Initialize();
 
   ShareFilesNaming GetNamingNoFlags() const {
     return options_.share_files_with_checksum_naming &
@@ -178,12 +178,12 @@ class BackupEngineImpl {
  private:
   void DeleteChildren(const std::string& dir,
                       uint32_t file_type_filter = 0) const;
-  Status DeleteBackupNoGC(BackupID backup_id);
+  IOStatus DeleteBackupNoGC(BackupID backup_id);
 
   // Extends the "result" map with pathname->size mappings for the contents of
   // "dir" in "env". Pathnames are prefixed with "dir".
-  Status ReadChildFileCurrentSizes(
-      const std::string& dir, Env* env,
+  IOStatus ReadChildFileCurrentSizes(
+      const std::string& dir, const std::shared_ptr<FileSystem>&,
       std::unordered_map<std::string, uint64_t>* result) const;
 
   struct FileInfo {
@@ -345,14 +345,15 @@ class BackupEngineImpl {
     BackupMeta(
         const std::string& meta_filename, const std::string& meta_tmp_filename,
         std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
-        Env* env)
+        Env* env, const std::shared_ptr<FileSystem>& fs)
         : timestamp_(0),
           sequence_number_(0),
           size_(0),
           meta_filename_(meta_filename),
           meta_tmp_filename_(meta_tmp_filename),
           file_infos_(file_infos),
-          env_(env) {}
+          env_(env),
+          fs_(fs) {}
 
     BackupMeta(const BackupMeta&) = delete;
     BackupMeta& operator=(const BackupMeta&) = delete;
@@ -386,9 +387,9 @@ class BackupEngineImpl {
       app_metadata_ = app_metadata;
     }
 
-    Status AddFile(std::shared_ptr<FileInfo> file_info);
+    IOStatus AddFile(std::shared_ptr<FileInfo> file_info);
 
-    Status Delete(bool delete_meta = true);
+    IOStatus Delete(bool delete_meta = true);
 
     bool Empty() const { return files_.empty(); }
 
@@ -404,12 +405,12 @@ class BackupEngineImpl {
     }
 
     // @param abs_path_to_size Pre-fetched file sizes (bytes).
-    Status LoadFromFile(
+    IOStatus LoadFromFile(
         const std::string& backup_dir,
         const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
         RateLimiter* rate_limiter, Logger* info_log,
         std::unordered_set<std::string>* reported_ignored_fields);
-    Status StoreToFile(
+    IOStatus StoreToFile(
         bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
 
     std::string GetInfoString() {
@@ -438,8 +439,8 @@ class BackupEngineImpl {
         dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash);
         // Make the RemapSharedFileSystem
         std::shared_ptr<FileSystem> remap_fs =
-            std::make_shared<RemapSharedFileSystem>(
-                env_->GetFileSystem(), dst_dir, src_base_dir, files_);
+            std::make_shared<RemapSharedFileSystem>(fs_, dst_dir, src_base_dir,
+                                                    files_);
         // Make it read-only for safety
         remap_fs = std::make_shared<ReadOnlyFileSystem>(remap_fs);
         // Make an Env wrapper
@@ -462,6 +463,8 @@ class BackupEngineImpl {
     std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
     Env* env_;
     mutable std::shared_ptr<Env> env_for_open_;
+    std::shared_ptr<FileSystem> fs_;
+    IOOptions iooptions_ = IOOptions();
   };  // BackupMeta
 
   void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta,
@@ -534,19 +537,19 @@ class BackupEngineImpl {
   //
   // @param src If non-empty, the file is copied from this pathname.
   // @param contents If non-empty, the file will be created with these contents.
-  Status CopyOrCreateFile(const std::string& src, const std::string& dst,
-                          const std::string& contents, Env* src_env,
-                          Env* dst_env, const EnvOptions& src_env_options,
-                          bool sync, RateLimiter* rate_limiter,
-                          uint64_t* size = nullptr,
-                          std::string* checksum_hex = nullptr,
-                          uint64_t size_limit = 0,
-                          std::function<void()> progress_callback = []() {});
-
-  Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env,
-                                    const EnvOptions& src_env_options,
-                                    uint64_t size_limit,
-                                    std::string* checksum_hex) const;
+  IOStatus CopyOrCreateFile(
+      const std::string& src, const std::string& dst,
+      const std::string& contents, Env* src_env, Env* dst_env,
+      const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
+      uint64_t* size = nullptr, std::string* checksum_hex = nullptr,
+      uint64_t size_limit = 0,
+      std::function<void()> progress_callback = []() {});
+
+  IOStatus ReadFileAndComputeChecksum(const std::string& src,
+                                      const std::shared_ptr<FileSystem>& src_fs,
+                                      const EnvOptions& src_env_options,
+                                      uint64_t size_limit,
+                                      std::string* checksum_hex) const;
 
   // Obtain db_id and db_session_id from the table properties of file_path
   Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
@@ -563,13 +566,13 @@ class BackupEngineImpl {
       // channel when the BackupEngineImpl is shutdown, these will also have
       // Status that have not been checked.  This
       // TODO: Fix those issues so that the Status
-      status.PermitUncheckedError();
+      io_status.PermitUncheckedError();
     }
     uint64_t size;
     std::string checksum_hex;
     std::string db_id;
     std::string db_session_id;
-    Status status;
+    IOStatus io_status;
   };
 
   // Exactly one of src_path and contents must be non-empty. If src_path is
@@ -756,7 +759,7 @@ class BackupEngineImpl {
   //    copied.
   // @param fname Name of destination file and, in case of copy, source file.
   // @param contents If non-empty, the file will be created with these contents.
-  Status AddBackupFileWorkItem(
+  IOStatus AddBackupFileWorkItem(
       std::unordered_set<std::string>& live_dst_paths,
       std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
       BackupID backup_id, bool shared, const std::string& src_dir,
@@ -773,7 +776,7 @@ class BackupEngineImpl {
   BackupID latest_backup_id_;
   BackupID latest_valid_backup_id_;
   std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
-  std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
+  std::map<BackupID, std::pair<IOStatus, std::unique_ptr<BackupMeta>>>
       corrupt_backups_;
   std::unordered_map<std::string,
                      std::shared_ptr<FileInfo>> backuped_file_infos_;
@@ -785,16 +788,19 @@ class BackupEngineImpl {
   Env* backup_env_;
 
   // directories
-  std::unique_ptr<Directory> backup_directory_;
-  std::unique_ptr<Directory> shared_directory_;
-  std::unique_ptr<Directory> meta_directory_;
-  std::unique_ptr<Directory> private_directory_;
+  std::unique_ptr<FSDirectory> backup_directory_;
+  std::unique_ptr<FSDirectory> shared_directory_;
+  std::unique_ptr<FSDirectory> meta_directory_;
+  std::unique_ptr<FSDirectory> private_directory_;
 
   static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
   bool read_only_;
   BackupStatistics backup_statistics_;
   std::unordered_set<std::string> reported_ignored_fields_;
   static const size_t kMaxAppMetaSize = 1024 * 1024;  // 1MB
+  std::shared_ptr<FileSystem> db_fs_;
+  std::shared_ptr<FileSystem> backup_fs_;
+  IOOptions io_options_ = IOOptions();
 
  public:
   std::unique_ptr<TEST_FutureSchemaVersion2Options> test_future_options_;
@@ -813,20 +819,20 @@ class BackupEngineImplThreadSafe : public BackupEngine,
   ~BackupEngineImplThreadSafe() override {}
 
   using BackupEngine::CreateNewBackupWithMetadata;
-  Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
-                                     const std::string& app_metadata,
-                                     BackupID* new_backup_id) override {
+  IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
+                                       DB* db, const std::string& app_metadata,
+                                       BackupID* new_backup_id) override {
     WriteLock lock(&mutex_);
     return impl_.CreateNewBackupWithMetadata(options, db, app_metadata,
                                              new_backup_id);
   }
 
-  Status PurgeOldBackups(uint32_t num_backups_to_keep) override {
+  IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override {
     WriteLock lock(&mutex_);
     return impl_.PurgeOldBackups(num_backups_to_keep);
   }
 
-  Status DeleteBackup(BackupID backup_id) override {
+  IOStatus DeleteBackup(BackupID backup_id) override {
     WriteLock lock(&mutex_);
     return impl_.DeleteBackup(backup_id);
   }
@@ -836,7 +842,7 @@ class BackupEngineImplThreadSafe : public BackupEngine,
     impl_.StopBackup();
   }
 
-  Status GarbageCollect() override {
+  IOStatus GarbageCollect() override {
     WriteLock lock(&mutex_);
     return impl_.GarbageCollect();
   }
@@ -867,23 +873,23 @@ class BackupEngineImplThreadSafe : public BackupEngine,
   }
 
   using BackupEngine::RestoreDBFromBackup;
-  Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
-                             const std::string& db_dir,
-                             const std::string& wal_dir) const override {
+  IOStatus RestoreDBFromBackup(const RestoreOptions& options,
+                               BackupID backup_id, const std::string& db_dir,
+                               const std::string& wal_dir) const override {
     ReadLock lock(&mutex_);
     return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
   }
 
   using BackupEngine::RestoreDBFromLatestBackup;
-  Status RestoreDBFromLatestBackup(const RestoreOptions& options,
-                                   const std::string& db_dir,
-                                   const std::string& wal_dir) const override {
+  IOStatus RestoreDBFromLatestBackup(
+      const RestoreOptions& options, const std::string& db_dir,
+      const std::string& wal_dir) const override {
     // Defer to above function, which locks
     return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
   }
 
-  Status VerifyBackup(BackupID backup_id,
-                      bool verify_with_checksum = false) const override {
+  IOStatus VerifyBackup(BackupID backup_id,
+                        bool verify_with_checksum = false) const override {
     ReadLock lock(&mutex_);
     return impl_.VerifyBackup(backup_id, verify_with_checksum);
   }
@@ -940,6 +946,8 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
     options_.restore_rate_limiter.reset(
         NewGenericRateLimiter(options_.restore_rate_limit));
   }
+  db_fs_ = db_env_->GetFileSystem();
+  backup_fs_ = backup_env_->GetFileSystem();
 }
 
 BackupEngineImpl::~BackupEngineImpl() {
@@ -953,7 +961,7 @@ BackupEngineImpl::~BackupEngineImpl() {
   }
 }
 
-Status BackupEngineImpl::Initialize() {
+IOStatus BackupEngineImpl::Initialize() {
   assert(!initialized_);
   initialized_ = true;
   if (read_only_) {
@@ -976,7 +984,7 @@ Status BackupEngineImpl::Initialize() {
     }
 
     // gather the list of directories that we need to create
-    std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
+    std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>>
         directories;
     directories.emplace_back(GetAbsolutePath(), &backup_directory_);
     if (options_.share_table_files) {
@@ -994,23 +1002,26 @@ Status BackupEngineImpl::Initialize() {
     directories.emplace_back(meta_path, &meta_directory_);
     // create all the dirs we need
     for (const auto& d : directories) {
-      auto s = backup_env_->CreateDirIfMissing(d.first);
-      if (s.ok()) {
-        s = backup_env_->NewDirectory(d.first, d.second);
+      IOStatus io_s =
+          backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr);
+      if (io_s.ok()) {
+        io_s =
+            backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr);
       }
-      if (!s.ok()) {
-        return s;
+      if (!io_s.ok()) {
+        return io_s;
       }
     }
   }
 
   std::vector<std::string> backup_meta_files;
   {
-    auto s = backup_env_->GetChildren(meta_path, &backup_meta_files);
-    if (s.IsNotFound()) {
-      return Status::NotFound(meta_path + " is missing");
-    } else if (!s.ok()) {
-      return s;
+    IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_,
+                                            &backup_meta_files, nullptr);
+    if (io_s.IsNotFound()) {
+      return IOStatus::NotFound(meta_path + " is missing");
+    } else if (!io_s.ok()) {
+      return io_s;
     }
   }
   // create backups_ structure
@@ -1034,7 +1045,7 @@ Status BackupEngineImpl::Initialize() {
         backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
                        GetBackupMetaFile(backup_id, false /* tmp */),
                        GetBackupMetaFile(backup_id, true /* tmp */),
-                       &backuped_file_infos_, backup_env_))));
+                       &backuped_file_infos_, backup_env_, backup_fs_))));
   }
 
   latest_backup_id_ = 0;
@@ -1045,12 +1056,12 @@ Status BackupEngineImpl::Initialize() {
         options_.info_log,
         "Backup Engine started with destroy_old_data == true, deleting all "
         "backups");
-    auto s = PurgeOldBackups(0);
-    if (s.ok()) {
-      s = GarbageCollect();
+    IOStatus io_s = PurgeOldBackups(0);
+    if (io_s.ok()) {
+      io_s = GarbageCollect();
     }
-    if (!s.ok()) {
-      return s;
+    if (!io_s.ok()) {
+      return io_s;
     }
   } else {  // Load data from storage
     // abs_path_to_size: maps absolute paths of files in backup directory to
@@ -1061,11 +1072,11 @@ Status BackupEngineImpl::Initialize() {
     for (const auto& rel_dir :
          {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
       const auto abs_dir = GetAbsolutePath(rel_dir);
-      Status s =
-          ReadChildFileCurrentSizes(abs_dir, backup_env_, &abs_path_to_size);
-      if (!s.ok()) {
+      IOStatus io_s =
+          ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size);
+      if (!io_s.ok()) {
         // I/O error likely impacting all backups
-        return s;
+        return io_s;
       }
     }
     // load the backups if any, until valid_backups_to_open of the latest
@@ -1084,26 +1095,26 @@ Status BackupEngineImpl::Initialize() {
 
       // Insert files and their sizes in backup sub-directories
       // (private/backup_id) to abs_path_to_size
-      Status s = ReadChildFileCurrentSizes(
-          GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
+      IOStatus io_s = ReadChildFileCurrentSizes(
+          GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_,
           &abs_path_to_size);
-      if (s.ok()) {
-        s = backup_iter->second->LoadFromFile(
+      if (io_s.ok()) {
+        io_s = backup_iter->second->LoadFromFile(
             options_.backup_dir, abs_path_to_size,
             options_.backup_rate_limiter.get(), options_.info_log,
             &reported_ignored_fields_);
       }
-      if (s.IsCorruption() || s.IsNotSupported()) {
+      if (io_s.IsCorruption() || io_s.IsNotSupported()) {
         ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
-                       backup_iter->first, s.ToString().c_str());
-        corrupt_backups_.insert(
-            std::make_pair(backup_iter->first,
-                           std::make_pair(s, std::move(backup_iter->second))));
-      } else if (!s.ok()) {
+                       backup_iter->first, io_s.ToString().c_str());
+        corrupt_backups_.insert(std::make_pair(
+            backup_iter->first,
+            std::make_pair(io_s, std::move(backup_iter->second))));
+      } else if (!io_s.ok()) {
         // Distinguish corruption errors from errors in the backup Env.
         // Errors in the backup Env (i.e., this code path) will cause Open() to
         // fail, whereas corruption errors would not cause Open() failures.
-        return s;
+        return io_s;
       } else {
         ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
                        backup_iter->first,
@@ -1168,7 +1179,7 @@ Status BackupEngineImpl::Initialize() {
         uint64_t prev_bytes_written = IOSTATS(bytes_written);
 
         CopyOrCreateResult result;
-        result.status = CopyOrCreateFile(
+        result.io_status = CopyOrCreateFile(
             work_item.src_path, work_item.dst_path, work_item.contents,
             work_item.src_env, work_item.dst_env, work_item.src_env_options,
             work_item.sync, work_item.rate_limiter, &result.size,
@@ -1182,7 +1193,7 @@ Status BackupEngineImpl::Initialize() {
 
         result.db_id = work_item.db_id;
         result.db_session_id = work_item.db_session_id;
-        if (result.status.ok() && !work_item.src_checksum_hex.empty()) {
+        if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
           // unknown checksum function name implies no db table file checksum in
           // db manifest; work_item.src_checksum_hex not empty means
           // backup engine has calculated its crc32c checksum for the table
@@ -1194,9 +1205,9 @@ Status BackupEngineImpl::Initialize() {
               std::string checksum_info(
                   "Expected checksum is " + work_item.src_checksum_hex +
                   " while computed checksum is " + result.checksum_hex);
-              result.status =
-                  Status::Corruption("Checksum mismatch after copying to " +
-                                     work_item.dst_path + ": " + checksum_info);
+              result.io_status = IOStatus::Corruption(
+                  "Checksum mismatch after copying to " + work_item.dst_path +
+                  ": " + checksum_info);
             }
           } else {
             // FIXME(peterd): dead code?
@@ -1217,16 +1228,16 @@ Status BackupEngineImpl::Initialize() {
   }
   ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
 
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status BackupEngineImpl::CreateNewBackupWithMetadata(
+IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
     const CreateBackupOptions& options, DB* db, const std::string& app_metadata,
     BackupID* new_backup_id_ptr) {
   assert(initialized_);
   assert(!read_only_);
   if (app_metadata.size() > kMaxAppMetaSize) {
-    return Status::InvalidArgument("App metadata too large");
+    return IOStatus::InvalidArgument("App metadata too large");
   }
 
   if (options.decrease_background_thread_cpu_priority) {
@@ -1246,8 +1257,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   assert(backups_.find(new_backup_id) == backups_.end());
 
   auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
-  Status s = backup_env_->FileExists(private_dir);
-  if (s.ok()) {
+  IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr);
+  if (io_s.ok()) {
     // maybe last backup failed and left partial state behind, clean it up.
     // need to do this before updating backups_ such that a private dir
     // named after new_backup_id will be cleaned up.
@@ -1255,17 +1266,17 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
     // of the latest full backup, then there could be more than one next
     // id with a private dir, the last thing to be deleted in delete
     // backup, but all will be cleaned up with a GarbageCollect.)
-    s = GarbageCollect();
-  } else if (s.IsNotFound()) {
+    io_s = GarbageCollect();
+  } else if (io_s.IsNotFound()) {
     // normal case, the new backup's private dir doesn't exist yet
-    s = Status::OK();
+    io_s = IOStatus::OK();
   }
 
   auto ret = backups_.insert(std::make_pair(
       new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
                          GetBackupMetaFile(new_backup_id, false /* tmp */),
                          GetBackupMetaFile(new_backup_id, true /* tmp */),
-                         &backuped_file_infos_, backup_env_))));
+                         &backuped_file_infos_, backup_env_, backup_fs_))));
   assert(ret.second == true);
   auto& new_backup = ret.first->second;
   new_backup->RecordTimestamp();
@@ -1283,8 +1294,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
                    "DEPRECATED and could lead to data loss.");
   }
 
-  if (s.ok()) {
-    s = backup_env_->CreateDir(private_dir);
+  if (io_s.ok()) {
+    io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr);
   }
 
   // A set into which we will insert the dst_paths that are calculated for live
@@ -1298,7 +1309,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   Status disabled = db->DisableFileDeletions();
   DBOptions db_options = db->GetDBOptions();
   Statistics* stats = db_options.statistics.get();
-  if (s.ok()) {
+  if (io_s.ok()) {
     CheckpointImpl checkpoint(db);
     uint64_t sequence_number = 0;
     FileChecksumGenFactory* db_checksum_factory =
@@ -1312,26 +1323,27 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
             : false;
     EnvOptions src_raw_env_options(db_options);
     RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
-    s = checkpoint.CreateCustomCheckpoint(
+    io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint(
         db_options,
         [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
             FileType) {
           // custom checkpoint will switch to calling copy_file_cb after it sees
           // NotSupported returned from link_file_cb.
-          return Status::NotSupported();
+          return IOStatus::NotSupported();
         } /* link_file_cb */,
         [&](const std::string& src_dirname, const std::string& fname,
             uint64_t size_limit_bytes, FileType type,
             const std::string& checksum_func_name,
             const std::string& checksum_val) {
           if (type == kWalFile && !options_.backup_log_files) {
-            return Status::OK();
+            return IOStatus::OK();
           }
           Log(options_.info_log, "add file for backup %s", fname.c_str());
           uint64_t size_bytes = 0;
-          Status st;
+          IOStatus io_st;
           if (type == kTableFile || type == kBlobFile) {
-            st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
+            io_st = db_fs_->GetFileSize(src_dirname + fname, io_options_,
+                                        &size_bytes, nullptr);
           }
           EnvOptions src_env_options;
           switch (type) {
@@ -1358,8 +1370,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
               src_env_options = src_raw_env_options;
               break;
           }
-          if (st.ok()) {
-            st = AddBackupFileWorkItem(
+          if (io_st.ok()) {
+            io_st = AddBackupFileWorkItem(
                 live_dst_paths, backup_items_to_finish, new_backup_id,
                 options_.share_table_files &&
                     (type == kTableFile || type == kBlobFile),
@@ -1370,7 +1382,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
                 options.progress_callback, "" /* contents */,
                 checksum_func_name, checksum_val);
           }
-          return st;
+          return io_st;
         } /* copy_file_cb */,
         [&](const std::string& fname, const std::string& contents,
             FileType type) {
@@ -1383,28 +1395,28 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
               false /* shared_checksum */, options.progress_callback, contents);
         } /* create_file_cb */,
         &sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
-        compare_checksum);
-    if (s.ok()) {
+        compare_checksum));
+    if (io_s.ok()) {
       new_backup->SetSequenceNumber(sequence_number);
     }
   }
   ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
-  Status item_status;
+  IOStatus item_io_status;
   for (auto& item : backup_items_to_finish) {
     item.result.wait();
     auto result = item.result.get();
-    item_status = result.status;
-    if (item_status.ok() && item.shared && item.needed_to_copy) {
-      item_status =
-          item.backup_env->RenameFile(item.dst_path_tmp, item.dst_path);
+    item_io_status = result.io_status;
+    if (item_io_status.ok() && item.shared && item.needed_to_copy) {
+      item_io_status = item.backup_env->GetFileSystem()->RenameFile(
+          item.dst_path_tmp, item.dst_path, io_options_, nullptr);
     }
-    if (item_status.ok()) {
-      item_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
+    if (item_io_status.ok()) {
+      item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
           item.dst_relative, result.size, result.checksum_hex, result.db_id,
           result.db_session_id));
     }
-    if (!item_status.ok()) {
-      s = item_status;
+    if (!item_io_status.ok()) {
+      io_s = item_io_status;
     }
   }
 
@@ -1414,33 +1426,34 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   }
   auto backup_time = backup_env_->NowMicros() - start_backup;
 
-  if (s.ok()) {
+  if (io_s.ok()) {
     // persist the backup metadata on the disk
-    s = new_backup->StoreToFile(options_.sync, test_future_options_.get());
+    io_s = new_backup->StoreToFile(options_.sync, test_future_options_.get());
   }
-  if (s.ok() && options_.sync) {
-    std::unique_ptr<Directory> backup_private_directory;
-    backup_env_->NewDirectory(
-        GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
-        &backup_private_directory);
+  if (io_s.ok() && options_.sync) {
+    std::unique_ptr<FSDirectory> backup_private_directory;
+    backup_fs_
+        ->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
+                       io_options_, &backup_private_directory, nullptr)
+        .PermitUncheckedError();
     if (backup_private_directory != nullptr) {
-      s = backup_private_directory->Fsync();
+      io_s = backup_private_directory->Fsync(io_options_, nullptr);
     }
-    if (s.ok() && private_directory_ != nullptr) {
-      s = private_directory_->Fsync();
+    if (io_s.ok() && private_directory_ != nullptr) {
+      io_s = private_directory_->Fsync(io_options_, nullptr);
     }
-    if (s.ok() && meta_directory_ != nullptr) {
-      s = meta_directory_->Fsync();
+    if (io_s.ok() && meta_directory_ != nullptr) {
+      io_s = meta_directory_->Fsync(io_options_, nullptr);
     }
-    if (s.ok() && shared_directory_ != nullptr) {
-      s = shared_directory_->Fsync();
+    if (io_s.ok() && shared_directory_ != nullptr) {
+      io_s = shared_directory_->Fsync(io_options_, nullptr);
     }
-    if (s.ok() && backup_directory_ != nullptr) {
-      s = backup_directory_->Fsync();
+    if (io_s.ok() && backup_directory_ != nullptr) {
+      io_s = backup_directory_->Fsync(io_options_, nullptr);
     }
   }
 
-  if (s.ok()) {
+  if (io_s.ok()) {
     backup_statistics_.IncrementNumberSuccessBackup();
     // here we know that we succeeded and installed the new backup
     latest_backup_id_ = new_backup_id;
@@ -1466,7 +1479,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
     backup_statistics_.IncrementNumberFailBackup();
     // clean all the files we might have created
     ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
-                   s.ToString().c_str());
+                   io_s.ToString().c_str());
     ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
                    backup_statistics_.ToString().c_str());
     // delete files that we might have already written
@@ -1477,15 +1490,15 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
   RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
   RecordTick(stats, BACKUP_WRITE_BYTES,
              IOSTATS(bytes_written) - prev_bytes_written);
-  return s;
+  return io_s;
 }
 
-Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
+IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
   assert(initialized_);
   assert(!read_only_);
 
   // Best effort deletion even with errors
-  Status overall_status = Status::OK();
+  IOStatus overall_status = IOStatus::OK();
 
   ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
                  num_backups_to_keep);
@@ -1497,25 +1510,25 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
   }
   for (auto backup_id : to_delete) {
     // Do not GC until end
-    auto s = DeleteBackupNoGC(backup_id);
-    if (!s.ok()) {
-      overall_status = s;
+    IOStatus io_s = DeleteBackupNoGC(backup_id);
+    if (!io_s.ok()) {
+      overall_status = io_s;
     }
   }
   // Clean up after any incomplete backup deletion, potentially from
   // earlier session.
   if (might_need_garbage_collect_) {
-    auto s = GarbageCollect();
-    if (!s.ok() && overall_status.ok()) {
-      overall_status = s;
+    IOStatus io_s = GarbageCollect();
+    if (!io_s.ok() && overall_status.ok()) {
+      overall_status = io_s;
     }
   }
   return overall_status;
 }
 
-Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
-  auto s1 = DeleteBackupNoGC(backup_id);
-  auto s2 = Status::OK();
+IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) {
+  IOStatus s1 = DeleteBackupNoGC(backup_id);
+  IOStatus s2 = IOStatus::OK();
 
   // Clean up after any incomplete backup deletion, potentially from
   // earlier session.
@@ -1534,26 +1547,26 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
 }
 
 // Does not auto-GarbageCollect nor lock
-Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
+IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
   assert(initialized_);
   assert(!read_only_);
 
   ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
   auto backup = backups_.find(backup_id);
   if (backup != backups_.end()) {
-    auto s = backup->second->Delete();
-    if (!s.ok()) {
-      return s;
+    IOStatus io_s = backup->second->Delete();
+    if (!io_s.ok()) {
+      return io_s;
     }
     backups_.erase(backup);
   } else {
     auto corrupt = corrupt_backups_.find(backup_id);
     if (corrupt == corrupt_backups_.end()) {
-      return Status::NotFound("Backup not found");
+      return IOStatus::NotFound("Backup not found");
     }
-    auto s = corrupt->second.second->Delete();
-    if (!s.ok()) {
-      return s;
+    IOStatus io_s = corrupt->second.second->Delete();
+    if (!io_s.ok()) {
+      return io_s;
     }
     corrupt->second.first.PermitUncheckedError();
     corrupt_backups_.erase(corrupt);
@@ -1565,11 +1578,12 @@ Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
   std::vector<std::string> to_delete;
   for (auto& itr : backuped_file_infos_) {
     if (itr.second->refs == 0) {
-      Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
+      IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first),
+                                             io_options_, nullptr);
       ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
-                     s.ToString().c_str());
+                     io_s.ToString().c_str());
       to_delete.push_back(itr.first);
-      if (!s.ok()) {
+      if (!io_s.ok()) {
         // Trying again later might work
         might_need_garbage_collect_ = true;
       }
@@ -1582,14 +1596,15 @@ Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
   // take care of private dirs -- GarbageCollect() will take care of them
   // if they are not empty
   std::string private_dir = GetPrivateFileRel(backup_id);
-  Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
+  IOStatus io_s =
+      backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr);
   ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
-                 private_dir.c_str(), s.ToString().c_str());
-  if (!s.ok()) {
+                 private_dir.c_str(), io_s.ToString().c_str());
+  if (!io_s.ok()) {
     // Full gc or trying again later might work
     might_need_garbage_collect_ = true;
   }
-  return Status::OK();
+  return IOStatus::OK();
 }
 
 void BackupEngineImpl::SetBackupInfoFromBackupMeta(
@@ -1660,10 +1675,9 @@ void BackupEngineImpl::GetCorruptedBackups(
   }
 }
 
-Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
-                                             BackupID backup_id,
-                                             const std::string& db_dir,
-                                             const std::string& wal_dir) const {
+IOStatus BackupEngineImpl::RestoreDBFromBackup(
+    const RestoreOptions& options, BackupID backup_id,
+    const std::string& db_dir, const std::string& wal_dir) const {
   assert(initialized_);
   if (backup_id == kLatestBackupIDMarker) {
     // Note: Read latest_valid_backup_id_ inside of lock
@@ -1675,11 +1689,11 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
   }
   auto backup_itr = backups_.find(backup_id);
   if (backup_itr == backups_.end()) {
-    return Status::NotFound("Backup not found");
+    return IOStatus::NotFound("Backup not found");
   }
   auto& backup = backup_itr->second;
   if (backup->Empty()) {
-    return Status::NotFound("Backup not found");
+    return IOStatus::NotFound("Backup not found");
   }
 
   ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
@@ -1687,8 +1701,10 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
                  static_cast<int>(options.keep_log_files));
 
   // just in case. Ignore errors
-  db_env_->CreateDirIfMissing(db_dir).PermitUncheckedError();
-  db_env_->CreateDirIfMissing(wal_dir).PermitUncheckedError();
+  db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr)
+      .PermitUncheckedError();
+  db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr)
+      .PermitUncheckedError();
 
   if (options.keep_log_files) {
     // delete files in db_dir, but keep all the log files
@@ -1696,7 +1712,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
     // move all the files from archive dir to wal_dir
     std::string archive_dir = ArchivalDirectory(wal_dir);
     std::vector<std::string> archive_files;
-    db_env_->GetChildren(archive_dir, &archive_files)
+    db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr)
         .PermitUncheckedError();  // ignore errors
     for (const auto& f : archive_files) {
       uint64_t number;
@@ -1706,12 +1722,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
         ROCKS_LOG_INFO(options_.info_log,
                        "Moving log file from archive/ to wal_dir: %s",
                        f.c_str());
-        Status s =
-            db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
-        if (!s.ok()) {
+        IOStatus io_s = db_fs_->RenameFile(
+            archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr);
+        if (!io_s.ok()) {
           // if we can't move log file from archive_dir to wal_dir,
           // we should fail, since it might mean data loss
-          return s;
+          return io_s;
         }
       }
     }
@@ -1721,12 +1737,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
     DeleteChildren(db_dir);
   }
 
-  Status s;
+  IOStatus io_s;
   std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
   std::string temporary_current_file;
   std::string final_current_file;
-  std::unique_ptr<Directory> db_dir_for_fsync;
-  std::unique_ptr<Directory> wal_dir_for_fsync;
+  std::unique_ptr<FSDirectory> db_dir_for_fsync;
+  std::unique_ptr<FSDirectory> wal_dir_for_fsync;
 
   for (const auto& file_info : backup->GetFiles()) {
     const std::string& file = file_info->filename;
@@ -1738,25 +1754,27 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
     FileType type;
     bool ok = ParseFileName(dst, &number, &type);
     if (!ok) {
-      return Status::Corruption("Backup corrupted: Fail to parse filename " +
-                                dst);
+      return IOStatus::Corruption("Backup corrupted: Fail to parse filename " +
+                                  dst);
     }
     // 3. Construct the final path
     // kWalFile lives in wal_dir and all the rest live in db_dir
     if (type == kWalFile) {
       dst = wal_dir + "/" + dst;
       if (options_.sync && !wal_dir_for_fsync) {
-        s = db_env_->NewDirectory(wal_dir, &wal_dir_for_fsync);
-        if (!s.ok()) {
-          return s;
+        io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync,
+                                    nullptr);
+        if (!io_s.ok()) {
+          return io_s;
         }
       }
     } else {
       dst = db_dir + "/" + dst;
       if (options_.sync && !db_dir_for_fsync) {
-        s = db_env_->NewDirectory(db_dir, &db_dir_for_fsync);
-        if (!s.ok()) {
-          return s;
+        io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync,
+                                    nullptr);
+        if (!io_s.ok()) {
+          return io_s;
         }
       }
     }
@@ -1782,19 +1800,19 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
     restore_items_to_finish.push_back(
         std::move(after_copy_or_create_work_item));
   }
-  Status item_status;
+  IOStatus item_io_status;
   for (auto& item : restore_items_to_finish) {
     item.result.wait();
     auto result = item.result.get();
-    item_status = result.status;
+    item_io_status = result.io_status;
     // Note: It is possible that both of the following bad-status cases occur
     // during copying. But, we only return one status.
-    if (!item_status.ok()) {
-      s = item_status;
+    if (!item_io_status.ok()) {
+      io_s = item_io_status;
       break;
     } else if (!item.checksum_hex.empty() &&
                item.checksum_hex != result.checksum_hex) {
-      s = Status::Corruption(
+      io_s = IOStatus::Corruption(
           "While restoring " + item.from_file + " -> " + item.to_file +
           ": expected checksum is " + item.checksum_hex +
           " while computed checksum is " + result.checksum_hex);
@@ -1804,36 +1822,37 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
 
   // When enabled, the first Fsync is to ensure all files are fully persisted
   // before renaming CURRENT.tmp
-  if (s.ok() && db_dir_for_fsync) {
+  if (io_s.ok() && db_dir_for_fsync) {
     ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
-    s = db_dir_for_fsync->Fsync();
+    io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
   }
 
-  if (s.ok() && wal_dir_for_fsync) {
-    s = wal_dir_for_fsync->Fsync();
+  if (io_s.ok() && wal_dir_for_fsync) {
+    io_s = wal_dir_for_fsync->Fsync(io_options_, nullptr);
   }
 
-  if (s.ok() && !temporary_current_file.empty()) {
+  if (io_s.ok() && !temporary_current_file.empty()) {
     ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n");
     assert(!final_current_file.empty());
-    s = db_env_->RenameFile(temporary_current_file, final_current_file);
+    io_s = db_fs_->RenameFile(temporary_current_file, final_current_file,
+                              io_options_, nullptr);
   }
 
-  if (s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
+  if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
     // Second Fsync is to ensure the final atomic rename of DB restore is
     // fully persisted even if power goes out right after restore operation
     // returns success
     assert(db_dir_for_fsync);
-    s = db_dir_for_fsync->Fsync();
+    io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
   }
 
   ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
-                 s.ToString().c_str());
-  return s;
+                 io_s.ToString().c_str());
+  return io_s;
 }
 
-Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
-                                      bool verify_with_checksum) const {
+IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id,
+                                        bool verify_with_checksum) const {
   assert(initialized_);
   // Check if backup_id is corrupted, or valid and registered
   auto corrupt_itr = corrupt_backups_.find(backup_id);
@@ -1843,12 +1862,12 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
 
   auto backup_itr = backups_.find(backup_id);
   if (backup_itr == backups_.end()) {
-    return Status::NotFound();
+    return IOStatus::NotFound();
   }
 
   auto& backup = backup_itr->second;
   if (backup->Empty()) {
-    return Status::NotFound();
+    return IOStatus::NotFound();
   }
 
   ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
@@ -1860,7 +1879,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
     const auto abs_dir = GetAbsolutePath(rel_dir);
     // Shared directories allowed to be missing in some cases. Expected but
     // missing files will be reported a few lines down.
-    ReadChildFileCurrentSizes(abs_dir, backup_env_, &curr_abs_path_to_size)
+    ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size)
         .PermitUncheckedError();
   }
 
@@ -1869,7 +1888,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
     const auto abs_path = GetAbsolutePath(file_info->filename);
     // check existence of the file
     if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
-      return Status::NotFound("File missing: " + abs_path);
+      return IOStatus::NotFound("File missing: " + abs_path);
     }
     // verify file size
     if (file_info->size != curr_abs_path_to_size[abs_path]) {
@@ -1877,37 +1896,38 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
                             ToString(file_info->size) +
                             " while found file size is " +
                             ToString(curr_abs_path_to_size[abs_path]));
-      return Status::Corruption("File corrupted: File size mismatch for " +
-                                abs_path + ": " + size_info);
+      return IOStatus::Corruption("File corrupted: File size mismatch for " +
+                                  abs_path + ": " + size_info);
     }
     if (verify_with_checksum && !file_info->checksum_hex.empty()) {
       // verify file checksum
       std::string checksum_hex;
       ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
                      abs_path.c_str());
-      Status s = ReadFileAndComputeChecksum(abs_path, backup_env_, EnvOptions(),
-                                            0 /* size_limit */, &checksum_hex);
-      if (!s.ok()) {
-        return s;
+      IOStatus io_s =
+          ReadFileAndComputeChecksum(abs_path, backup_fs_, EnvOptions(),
+                                     0 /* size_limit */, &checksum_hex);
+      if (!io_s.ok()) {
+        return io_s;
       } else if (file_info->checksum_hex != checksum_hex) {
         std::string checksum_info(
             "Expected checksum is " + file_info->checksum_hex +
             " while computed checksum is " + checksum_hex);
-        return Status::Corruption("File corrupted: Checksum mismatch for " +
-                                  abs_path + ": " + checksum_info);
+        return IOStatus::Corruption("File corrupted: Checksum mismatch for " +
+                                    abs_path + ": " + checksum_info);
       }
     }
   }
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status BackupEngineImpl::CopyOrCreateFile(
+IOStatus BackupEngineImpl::CopyOrCreateFile(
     const std::string& src, const std::string& dst, const std::string& contents,
     Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
     RateLimiter* rate_limiter, uint64_t* size, std::string* checksum_hex,
     uint64_t size_limit, std::function<void()> progress_callback) {
   assert(src.empty() != contents.empty());
-  Status s;
+  IOStatus io_s;
   std::unique_ptr<FSWritableFile> dst_file;
   std::unique_ptr<FSSequentialFile> src_file;
   FileOptions dst_file_options;
@@ -1923,14 +1943,14 @@ Status BackupEngineImpl::CopyOrCreateFile(
     size_limit = std::numeric_limits<uint64_t>::max();
   }
 
-  s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
-                                                &dst_file, nullptr);
-  if (s.ok() && !src.empty()) {
-    s = src_env->GetFileSystem()->NewSequentialFile(
+  io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
+                                                   &dst_file, nullptr);
+  if (io_s.ok() && !src.empty()) {
+    io_s = src_env->GetFileSystem()->NewSequentialFile(
         src, FileOptions(src_env_options), &src_file, nullptr);
   }
-  if (!s.ok()) {
-    return s;
+  if (!io_s.ok()) {
+    return io_s;
   }
 
   size_t buf_size =
@@ -1950,12 +1970,12 @@ Status BackupEngineImpl::CopyOrCreateFile(
   uint64_t processed_buffer_size = 0;
   do {
     if (stop_backup_.load(std::memory_order_acquire)) {
-      return Status::Incomplete("Backup stopped");
+      return status_to_io_status(Status::Incomplete("Backup stopped"));
     }
     if (!src.empty()) {
       size_t buffer_to_read =
           (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
-      s = src_reader->Read(buffer_to_read, &data, buf.get());
+      io_s = src_reader->Read(buffer_to_read, &data, buf.get());
       if (rate_limiter != nullptr) {
         rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
                               RateLimiter::OpType::kRead);
@@ -1970,8 +1990,8 @@ Status BackupEngineImpl::CopyOrCreateFile(
         (src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data
                                                                     : nullptr);
 
-    if (!s.ok()) {
-      return s;
+    if (!io_s.ok()) {
+      return io_s;
     }
 
     if (size != nullptr) {
@@ -1980,7 +2000,7 @@ Status BackupEngineImpl::CopyOrCreateFile(
     if (checksum_hex != nullptr) {
       checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
     }
-    s = dest_writer->Append(data);
+    io_s = dest_writer->Append(data);
     if (rate_limiter != nullptr) {
       rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
                             RateLimiter::OpType::kWrite);
@@ -1990,24 +2010,24 @@ Status BackupEngineImpl::CopyOrCreateFile(
       std::lock_guard<std::mutex> lock(byte_report_mutex_);
       progress_callback();
     }
-  } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
+  } while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
 
   // Convert uint32_t checksum to hex checksum
   if (checksum_hex != nullptr) {
     checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
   }
 
-  if (s.ok() && sync) {
-    s = dest_writer->Sync(false);
+  if (io_s.ok() && sync) {
+    io_s = dest_writer->Sync(false);
   }
-  if (s.ok()) {
-    s = dest_writer->Close();
+  if (io_s.ok()) {
+    io_s = dest_writer->Close();
   }
-  return s;
+  return io_s;
 }
 
 // fname will always start with "/"
-Status BackupEngineImpl::AddBackupFileWorkItem(
+IOStatus BackupEngineImpl::AddBackupFileWorkItem(
     std::unordered_set<std::string>& live_dst_paths,
     std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
     BackupID backup_id, bool shared, const std::string& src_dir,
@@ -2040,7 +2060,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
   // Step 0: Check if default checksum function name is passed in
   if (kDbFileChecksumFuncName == src_checksum_func_name) {
     if (src_checksum_str == kUnknownFileChecksum) {
-      return Status::Aborted("Unknown checksum value for " + fname);
+      return status_to_io_status(
+          Status::Aborted("Unknown checksum value for " + fname));
     }
     checksum_hex = ChecksumStrToHex(src_checksum_str);
   }
@@ -2061,14 +2082,14 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
     // since the session id should suffice to avoid file name collision in
     // the shared_checksum directory.
     if (checksum_hex.empty() && db_session_id.empty()) {
-      Status s = ReadFileAndComputeChecksum(
-          src_dir + fname, db_env_, src_env_options, size_limit, &checksum_hex);
-      if (!s.ok()) {
-        return s;
+      IOStatus io_s = ReadFileAndComputeChecksum(
+          src_dir + fname, db_fs_, src_env_options, size_limit, &checksum_hex);
+      if (!io_s.ok()) {
+        return io_s;
       }
     }
     if (size_bytes == port::kMaxUint64) {
-      return Status::NotFound("File missing: " + src_dir + fname);
+      return IOStatus::NotFound("File missing: " + src_dir + fname);
     }
     // dst_relative depends on the following conditions:
     // 1) the naming scheme is kUseDbSessionId,
@@ -2122,7 +2143,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
   if (shared && !same_path) {
     // Should be in shared directory but not a live path, check existence in
     // shared directory
-    Status exist = backup_env_->FileExists(final_dest_path);
+    IOStatus exist =
+        backup_fs_->FileExists(final_dest_path, io_options_, nullptr);
     if (exist.ok()) {
       file_exists = true;
     } else if (exist.IsNotFound()) {
@@ -2146,7 +2168,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
           fname.c_str());
       need_to_copy = true;
       // Defer any failure reporting to when we try to write the file
-      backup_env_->DeleteFile(final_dest_path).PermitUncheckedError();
+      backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr)
+          .PermitUncheckedError();
     } else {
       // file exists and referenced
       if (checksum_hex.empty()) {
@@ -2178,11 +2201,11 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
           // ID, but even in that case, we double check the file sizes in
           // BackupMeta::AddFile.
         } else {
-          Status s = ReadFileAndComputeChecksum(src_dir + fname, db_env_,
-                                                src_env_options, size_limit,
-                                                &checksum_hex);
-          if (!s.ok()) {
-            return s;
+          IOStatus io_s = ReadFileAndComputeChecksum(src_dir + fname, db_fs_,
+                                                     src_env_options,
+                                                     size_limit, &checksum_hex);
+          if (!io_s.ok()) {
+            return io_s;
           }
         }
       }
@@ -2222,21 +2245,22 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
         temp_dest_path, final_dest_path, dst_relative);
     backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
     CopyOrCreateResult result;
-    result.status = Status::OK();
+    result.io_status = IOStatus::OK();
     result.size = size_bytes;
     result.checksum_hex = std::move(checksum_hex);
     result.db_id = std::move(db_id);
     result.db_session_id = std::move(db_session_id);
     promise_result.set_value(std::move(result));
   }
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status BackupEngineImpl::ReadFileAndComputeChecksum(
-    const std::string& src, Env* src_env, const EnvOptions& src_env_options,
-    uint64_t size_limit, std::string* checksum_hex) const {
+IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
+    const std::string& src, const std::shared_ptr<FileSystem>& src_fs,
+    const EnvOptions& src_env_options, uint64_t size_limit,
+    std::string* checksum_hex) const {
   if (checksum_hex == nullptr) {
-    return Status::Aborted("Checksum pointer is null");
+    return status_to_io_status(Status::Aborted("Checksum pointer is null"));
   }
   uint32_t checksum_value = 0;
   if (size_limit == 0) {
@@ -2244,11 +2268,10 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
   }
 
   std::unique_ptr<SequentialFileReader> src_reader;
-  Status s = SequentialFileReader::Create(src_env->GetFileSystem(), src,
-                                          FileOptions(src_env_options),
-                                          &src_reader, nullptr);
-  if (!s.ok()) {
-    return s;
+  IOStatus io_s = SequentialFileReader::Create(
+      src_fs, src, FileOptions(src_env_options), &src_reader, nullptr);
+  if (!io_s.ok()) {
+    return io_s;
   }
 
   RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
@@ -2260,17 +2283,17 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
 
   do {
     if (stop_backup_.load(std::memory_order_acquire)) {
-      return Status::Incomplete("Backup stopped");
+      return status_to_io_status(Status::Incomplete("Backup stopped"));
     }
     size_t buffer_to_read =
         (buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
-    s = src_reader->Read(buffer_to_read, &data, buf.get());
+    io_s = src_reader->Read(buffer_to_read, &data, buf.get());
     if (rate_limiter != nullptr) {
       rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
                             RateLimiter::OpType::kRead);
     }
-    if (!s.ok()) {
-      return s;
+    if (!io_s.ok()) {
+      return io_s;
     }
 
     size_limit -= data.size();
@@ -2279,7 +2302,7 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
 
   checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
 
-  return s;
+  return io_s;
 }
 
 Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
@@ -2347,7 +2370,8 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
 void BackupEngineImpl::DeleteChildren(const std::string& dir,
                                       uint32_t file_type_filter) const {
   std::vector<std::string> children;
-  db_env_->GetChildren(dir, &children).PermitUncheckedError();  // ignore errors
+  db_fs_->GetChildren(dir, io_options_, &children, nullptr)
+      .PermitUncheckedError();  // ignore errors
 
   for (const auto& f : children) {
     uint64_t number;
@@ -2357,36 +2381,38 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
       // don't delete this file
       continue;
     }
-    db_env_->DeleteFile(dir + "/" + f).PermitUncheckedError();  // ignore errors
+    db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr)
+        .PermitUncheckedError();  // ignore errors
   }
 }
 
-Status BackupEngineImpl::ReadChildFileCurrentSizes(
-    const std::string& dir, Env* env,
+IOStatus BackupEngineImpl::ReadChildFileCurrentSizes(
+    const std::string& dir, const std::shared_ptr<FileSystem>& fs,
     std::unordered_map<std::string, uint64_t>* result) const {
   assert(result != nullptr);
   std::vector<Env::FileAttributes> files_attrs;
-  Status status = env->FileExists(dir);
-  if (status.ok()) {
-    status = env->GetChildrenFileAttributes(dir, &files_attrs);
-  } else if (status.IsNotFound()) {
+  IOStatus io_status = fs->FileExists(dir, io_options_, nullptr);
+  if (io_status.ok()) {
+    io_status =
+        fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr);
+  } else if (io_status.IsNotFound()) {
     // Insert no entries can be considered success
-    status = Status::OK();
+    io_status = IOStatus::OK();
   }
   const bool slash_needed = dir.empty() || dir.back() != '/';
   for (const auto& file_attrs : files_attrs) {
     result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
                     file_attrs.size_bytes);
   }
-  return status;
+  return io_status;
 }
 
-Status BackupEngineImpl::GarbageCollect() {
+IOStatus BackupEngineImpl::GarbageCollect() {
   assert(!read_only_);
 
   // We will make a best effort to remove all garbage even in the presence
   // of inconsistencies or I/O failures that inhibit finding garbage.
-  Status overall_status = Status::OK();
+  IOStatus overall_status = IOStatus::OK();
   // If all goes well, we don't need another auto-GC this session
   might_need_garbage_collect_ = false;
 
@@ -2402,14 +2428,15 @@ Status BackupEngineImpl::GarbageCollect() {
       } else {
         shared_path = GetAbsolutePath(GetSharedFileRel());
       }
-      auto s = backup_env_->FileExists(shared_path);
-      if (s.ok()) {
-        s = backup_env_->GetChildren(shared_path, &shared_children);
-      } else if (s.IsNotFound()) {
-        s = Status::OK();
+      IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr);
+      if (io_s.ok()) {
+        io_s = backup_fs_->GetChildren(shared_path, io_options_,
+                                       &shared_children, nullptr);
+      } else if (io_s.IsNotFound()) {
+        io_s = IOStatus::OK();
       }
-      if (!s.ok()) {
-        overall_status = s;
+      if (!io_s.ok()) {
+        overall_status = io_s;
         // Trying again later might work
         might_need_garbage_collect_ = true;
       }
@@ -2427,11 +2454,12 @@ Status BackupEngineImpl::GarbageCollect() {
           child_itr->second->refs == 0) {
         // this might be a directory, but DeleteFile will just fail in that
         // case, so we're good
-        Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
+        IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname),
+                                               io_options_, nullptr);
         ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
-                       rel_fname.c_str(), s.ToString().c_str());
+                       rel_fname.c_str(), io_s.ToString().c_str());
         backuped_file_infos_.erase(rel_fname);
-        if (!s.ok()) {
+        if (!io_s.ok()) {
           // Trying again later might work
           might_need_garbage_collect_ = true;
         }
@@ -2442,10 +2470,11 @@ Status BackupEngineImpl::GarbageCollect() {
   // delete obsolete private files
   std::vector<std::string> private_children;
   {
-    auto s = backup_env_->GetChildren(GetAbsolutePath(kPrivateDirName),
-                                      &private_children);
-    if (!s.ok()) {
-      overall_status = s;
+    IOStatus io_s =
+        backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_,
+                                &private_children, nullptr);
+    if (!io_s.ok()) {
+      overall_status = io_s;
       // Trying again later might work
       might_need_garbage_collect_ = true;
     }
@@ -2463,23 +2492,27 @@ Status BackupEngineImpl::GarbageCollect() {
     std::string full_private_path =
         GetAbsolutePath(GetPrivateFileRel(backup_id));
     std::vector<std::string> subchildren;
-    if (backup_env_->GetChildren(full_private_path, &subchildren).ok()) {
+    if (backup_fs_
+            ->GetChildren(full_private_path, io_options_, &subchildren, nullptr)
+            .ok()) {
       for (auto& subchild : subchildren) {
-        Status s = backup_env_->DeleteFile(full_private_path + subchild);
+        IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild,
+                                               io_options_, nullptr);
         ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
                        (full_private_path + subchild).c_str(),
-                       s.ToString().c_str());
-        if (!s.ok()) {
+                       io_s.ToString().c_str());
+        if (!io_s.ok()) {
           // Trying again later might work
           might_need_garbage_collect_ = true;
         }
       }
     }
     // finally delete the private dir
-    Status s = backup_env_->DeleteDir(full_private_path);
+    IOStatus io_s =
+        backup_fs_->DeleteDir(full_private_path, io_options_, nullptr);
     ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
-                   full_private_path.c_str(), s.ToString().c_str());
-    if (!s.ok()) {
+                   full_private_path.c_str(), io_s.ToString().c_str());
+    if (!io_s.ok()) {
       // Trying again later might work
       might_need_garbage_collect_ = true;
     }
@@ -2491,7 +2524,7 @@ Status BackupEngineImpl::GarbageCollect() {
 
 // ------- BackupMeta class --------
 
-Status BackupEngineImpl::BackupMeta::AddFile(
+IOStatus BackupEngineImpl::BackupMeta::AddFile(
     std::shared_ptr<FileInfo> file_info) {
   auto itr = file_infos_->find(file_info->filename);
   if (itr == file_infos_->end()) {
@@ -2501,7 +2534,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
       itr->second->refs = 1;
     } else {
       // if this happens, something is seriously wrong
-      return Status::Corruption("In memory metadata insertion error");
+      return IOStatus::Corruption("In memory metadata insertion error");
     }
   } else {
     // Compare sizes, because we scanned that off the filesystem on both
@@ -2514,7 +2547,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
       msg.append(
           " If this DB file checks as not corrupt, try deleting old"
           " backups or backing up to a different backup directory.");
-      return Status::Corruption(msg);
+      return IOStatus::Corruption(msg);
     }
     if (file_info->checksum_hex.empty()) {
       // No checksum available to check
@@ -2536,7 +2569,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
       msg.append(
           " If this DB file checks as not corrupt, try deleting old"
           " backups or backing up to a different backup directory.");
-      return Status::Corruption(msg);
+      return IOStatus::Corruption(msg);
     }
     ++itr->second->refs;  // increase refcount if already present
   }
@@ -2544,26 +2577,26 @@ Status BackupEngineImpl::BackupMeta::AddFile(
   size_ += file_info->size;
   files_.push_back(itr->second);
 
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
-  Status s;
+IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
+  IOStatus io_s;
   for (const auto& file : files_) {
     --file->refs;  // decrease refcount
   }
   files_.clear();
   // delete meta file
   if (delete_meta) {
-    s = env_->FileExists(meta_filename_);
-    if (s.ok()) {
-      s = env_->DeleteFile(meta_filename_);
-    } else if (s.IsNotFound()) {
-      s = Status::OK();  // nothing to delete
+    io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr);
+    if (io_s.ok()) {
+      io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr);
+    } else if (io_s.IsNotFound()) {
+      io_s = IOStatus::OK();  // nothing to delete
     }
   }
   timestamp_ = 0;
-  return s;
+  return io_s;
 }
 
 // Constants for backup meta file schema (see LoadFromFile)
@@ -2637,7 +2670,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"};
 // * Footer meta fields:
 //   * None yet (future use for meta file checksum anticipated)
 //
-Status BackupEngineImpl::BackupMeta::LoadFromFile(
+IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
     const std::string& backup_dir,
     const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
     RateLimiter* rate_limiter, Logger* info_log,
@@ -2647,11 +2680,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
 
   std::unique_ptr<LineFileReader> backup_meta_reader;
   {
-    Status s =
-        LineFileReader::Create(env_->GetFileSystem(), meta_filename_,
-                               FileOptions(), &backup_meta_reader, nullptr);
-    if (!s.ok()) {
-      return s;
+    IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(),
+                                           &backup_meta_reader, nullptr);
+    if (!io_s.ok()) {
+      return io_s;
     }
   }
 
@@ -2671,12 +2703,12 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
       if (ver == "2" || StartsWith(ver, "2.")) {
         schema_major_version = 2;
       } else {
-        return Status::NotSupported(
+        return IOStatus::NotSupported(
             "Unsupported/unrecognized schema version: " + ver);
       }
       line.clear();
     } else if (line.empty()) {
-      return Status::Corruption("Unexpected empty line");
+      return IOStatus::Corruption("Unexpected empty line");
     }
   }
   if (!line.empty()) {
@@ -2702,7 +2734,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
                             RateLimiter::OpType::kRead);
     }
     if (line.empty()) {
-      return Status::Corruption("Unexpected empty line");
+      return IOStatus::Corruption("Unexpected empty line");
     }
     // Number -> number of files -> exit loop reading optional meta fields
     if (line[0] >= '0' && line[0] <= '9') {
@@ -2712,7 +2744,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
     // else, must be a meta field assignment
     auto space_pos = line.find_first_of(' ');
     if (space_pos == std::string::npos) {
-      return Status::Corruption("Expected number of files or meta field");
+      return IOStatus::Corruption("Expected number of files or meta field");
     }
     std::string field_name = line.substr(0, space_pos);
     std::string field_data = line.substr(space_pos + 1);
@@ -2720,15 +2752,15 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
       // app metadata present
       bool decode_success = Slice(field_data).DecodeHex(&app_metadata_);
       if (!decode_success) {
-        return Status::Corruption(
+        return IOStatus::Corruption(
             "Failed to decode stored hex encoded app metadata");
       }
     } else if (schema_major_version < 2) {
-      return Status::Corruption("Expected number of files or \"" +
-                                kAppMetaDataFieldName + "\" field");
+      return IOStatus::Corruption("Expected number of files or \"" +
+                                  kAppMetaDataFieldName + "\" field");
     } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
-      return Status::NotSupported("Unrecognized non-ignorable meta field " +
-                                  field_name + " (from future version?)");
+      return IOStatus::NotSupported("Unrecognized non-ignorable meta field " +
+                                    field_name + " (from future version?)");
     } else {
       // Warn the first time we see any particular unrecognized meta field
       if (reported_ignored_fields->insert("meta:" + field_name).second) {
@@ -2747,7 +2779,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
     std::vector<std::string> components = StringSplit(line, ' ');
 
     if (components.size() < 1) {
-      return Status::Corruption("Empty line instead of file entry.");
+      return IOStatus::Corruption("Empty line instead of file entry.");
     }
     if (schema_major_version >= 2 && components.size() == 2 &&
         line == kFooterMarker) {
@@ -2765,30 +2797,30 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
       std::string abs_path = backup_dir + "/" + filename;
       auto e = abs_path_to_size.find(abs_path);
       if (e == abs_path_to_size.end()) {
-        return Status::Corruption("Pathname in meta file not found on disk: " +
-                                  abs_path);
+        return IOStatus::Corruption(
+            "Pathname in meta file not found on disk: " + abs_path);
       }
       actual_size = e->second;
     }
 
     if (schema_major_version >= 2) {
       if (components.size() % 2 != 1) {
-        return Status::Corruption(
+        return IOStatus::Corruption(
             "Bad number of line components for file entry.");
       }
     } else {
       // Check restricted original schema
       if (components.size() < 3) {
-        return Status::Corruption("File checksum is missing for " + filename +
-                                  " in " + meta_filename_);
+        return IOStatus::Corruption("File checksum is missing for " + filename +
+                                    " in " + meta_filename_);
       }
       if (components[1] != kFileCrc32cFieldName) {
-        return Status::Corruption("Unknown checksum type for " + filename +
-                                  " in " + meta_filename_);
+        return IOStatus::Corruption("Unknown checksum type for " + filename +
+                                    " in " + meta_filename_);
       }
       if (components.size() > 3) {
-        return Status::Corruption("Extra data for entry " + filename + " in " +
-                                  meta_filename_);
+        return IOStatus::Corruption("Extra data for entry " + filename +
+                                    " in " + meta_filename_);
       }
     }
 
@@ -2801,21 +2833,21 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
         uint32_t checksum_value =
             static_cast<uint32_t>(strtoul(field_data.c_str(), nullptr, 10));
         if (field_data != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
-          return Status::Corruption("Invalid checksum value for " + filename +
-                                    " in " + meta_filename_);
+          return IOStatus::Corruption("Invalid checksum value for " + filename +
+                                      " in " + meta_filename_);
         }
         checksum_hex = ChecksumInt32ToHex(checksum_value);
       } else if (field_name == kFileSizeFieldName) {
         uint64_t ex_size =
             std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
         if (ex_size != actual_size) {
-          return Status::Corruption("For file " + filename + " expected size " +
-                                    ToString(ex_size) + " but found size" +
-                                    ToString(actual_size));
+          return IOStatus::Corruption(
+              "For file " + filename + " expected size " + ToString(ex_size) +
+              " but found size" + ToString(actual_size));
         }
       } else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
-        return Status::NotSupported("Unrecognized non-ignorable file field " +
-                                    field_name + " (from future version?)");
+        return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
+                                      field_name + " (from future version?)");
       } else {
         // Warn the first time we see any particular unrecognized file field
         if (reported_ignored_fields->insert("file:" + field_name).second) {
@@ -2836,17 +2868,17 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
                               RateLimiter::OpType::kRead);
       }
       if (line.empty()) {
-        return Status::Corruption("Unexpected empty line");
+        return IOStatus::Corruption("Unexpected empty line");
       }
       auto space_pos = line.find_first_of(' ');
       if (space_pos == std::string::npos) {
-        return Status::Corruption("Expected footer field");
+        return IOStatus::Corruption("Expected footer field");
       }
       std::string field_name = line.substr(0, space_pos);
       std::string field_data = line.substr(space_pos + 1);
       if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
-        return Status::NotSupported("Unrecognized non-ignorable field " +
-                                    field_name + " (from future version?)");
+        return IOStatus::NotSupported("Unrecognized non-ignorable field " +
+                                      field_name + " (from future version?)");
       } else if (reported_ignored_fields->insert("footer:" + field_name)
                      .second) {
         // Warn the first time we see any particular unrecognized footer field
@@ -2858,39 +2890,40 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
   }
 
   {
-    Status s = backup_meta_reader->GetStatus();
-    if (!s.ok()) {
-      return s;
+    IOStatus io_s = backup_meta_reader->GetStatus();
+    if (!io_s.ok()) {
+      return io_s;
     }
   }
 
   if (num_files != files.size()) {
-    return Status::Corruption(
+    return IOStatus::Corruption(
         "Inconsistent number of files or missing/incomplete header in " +
         meta_filename_);
   }
 
   files_.reserve(files.size());
   for (const auto& file_info : files) {
-    Status s = AddFile(file_info);
-    if (!s.ok()) {
-      return s;
+    IOStatus io_s = AddFile(file_info);
+    if (!io_s.ok()) {
+      return io_s;
     }
   }
 
-  return Status::OK();
+  return IOStatus::OK();
 }
 
-Status BackupEngineImpl::BackupMeta::StoreToFile(
+IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
     bool sync, const TEST_FutureSchemaVersion2Options* test_future_options) {
-  Status s;
-  std::unique_ptr<WritableFile> backup_meta_file;
-  EnvOptions env_options;
-  env_options.use_mmap_writes = false;
-  env_options.use_direct_writes = false;
-  s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
-  if (!s.ok()) {
-    return s;
+  IOStatus io_s;
+  std::unique_ptr<FSWritableFile> backup_meta_file;
+  FileOptions file_options;
+  file_options.use_mmap_writes = false;
+  file_options.use_direct_writes = false;
+  io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options,
+                              &backup_meta_file, nullptr);
+  if (!io_s.ok()) {
+    return io_s;
   }
 
   std::ostringstream buf;
@@ -2938,18 +2971,19 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(
     }
   }
 
-  s = backup_meta_file->Append(Slice(buf.str()));
+  io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr);
   IOSTATS_ADD(bytes_written, buf.str().size());
-  if (s.ok() && sync) {
-    s = backup_meta_file->Sync();
+  if (io_s.ok() && sync) {
+    io_s = backup_meta_file->Sync(iooptions_, nullptr);
   }
-  if (s.ok()) {
-    s = backup_meta_file->Close();
+  if (io_s.ok()) {
+    io_s = backup_meta_file->Close(iooptions_, nullptr);
   }
-  if (s.ok()) {
-    s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
+  if (io_s.ok()) {
+    io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_,
+                           nullptr);
   }
-  return s;
+  return io_s;
 }
 
 Status BackupEngineReadOnly::Open(const BackupEngineOptions& options, Env* env,