]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Fix a couple of missing cases of retry on corruption (#13007)
authoranand76 <anand1976@users.noreply.github.com>
Fri, 13 Sep 2024 20:56:49 +0000 (13:56 -0700)
committeranand76 <anand1976@users.noreply.github.com>
Tue, 17 Sep 2024 23:39:01 +0000 (16:39 -0700)
Summary:
For SST checksum mismatch corruptions in the read path, RocksDB retries the read if the underlying file system supports verification and reconstruction of data (`FSSupportedOps::kVerifyAndReconstructRead`). There were a couple of places where the retry was missing - reading the SST footer and the properties block. This PR fixes the retry in those cases.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13007

Test Plan: Add new unit tests

Reviewed By: jaykorean

Differential Revision: D62519186

Pulled By: anand1976

fbshipit-source-id: 50aa38f18f2a53531a9fc8d4ccdf34fbf034ed59

db/db_io_failure_test.cc
table/block_based/block_based_table_reader.cc
table/format.cc
table/format.h
table/meta_blocks.cc

index 9826ab6680fe3342f51d8be824af7e9dcb9455ef..b72c25998705796e23d5c8c8e7a177e2efd1ef76 100644 (file)
@@ -895,6 +895,81 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) {
   SyncPoint::GetInstance()->DisableProcessing();
 }
 
+TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) {
+  Random rnd(300);
+  bool retry = false;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "ReadFooterFromFileInternal:0", [&](void* arg) {
+        Slice* data = static_cast<Slice*>(arg);
+        if (!retry) {
+          std::memcpy(const_cast<char*>(data->data()),
+                      rnd.RandomString(static_cast<int>(data->size())).c_str(),
+                      data->size());
+          retry = true;
+        }
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put("key1", "val1"));
+  Status s = Flush();
+  if (std::get<2>(GetParam())) {
+    ASSERT_OK(s);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
+              1);
+
+    std::string val;
+    ReadOptions ro;
+    ro.async_io = std::get<1>(GetParam());
+    ASSERT_OK(dbfull()->Get(ro, "key1", &val));
+    ASSERT_EQ(val, "val1");
+  } else {
+    ASSERT_NOK(s);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
+    ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0);
+  }
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) {
+  Random rnd(300);
+  bool retry = false;
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "ReadTablePropertiesHelper:0", [&](void* arg) {
+        Slice* data = static_cast<Slice*>(arg);
+        if (!retry) {
+          std::memcpy(const_cast<char*>(data->data()),
+                      rnd.RandomString(static_cast<int>(data->size())).c_str(),
+                      data->size());
+          retry = true;
+        }
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put("key1", "val1"));
+  Status s = Flush();
+  if (std::get<2>(GetParam())) {
+    ASSERT_OK(s);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT),
+              1);
+
+    std::string val;
+    ReadOptions ro;
+    ro.async_io = std::get<1>(GetParam());
+    ASSERT_OK(dbfull()->Get(ro, "key1", &val));
+    ASSERT_EQ(val, "val1");
+  } else {
+    ASSERT_NOK(s);
+    ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0);
+  }
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 // The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption,
 // 3. Retry with verify_and_reconstruct_read IOOption
 INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
index f9bdfc9b07d79a4708ea4853915d7d57f3b59145..fe45224d08eba494287e502d49f91a7c558d3c9e 100644 (file)
@@ -680,26 +680,12 @@ Status BlockBasedTable::Open(
   if (s.ok()) {
     s = ReadFooterFromFile(opts, file.get(), *ioptions.fs,
                            prefetch_buffer.get(), file_size, &footer,
-                           kBlockBasedTableMagicNumber);
-  }
-  // If the footer is corrupted and the FS supports checksum verification and
-  // correction, try reading the footer again
-  if (s.IsCorruption()) {
-    RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
-    if (CheckFSFeatureSupport(ioptions.fs.get(),
-                              FSSupportedOps::kVerifyAndReconstructRead)) {
-      IOOptions retry_opts = opts;
-      retry_opts.verify_and_reconstruct_read = true;
-      s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs,
-                             prefetch_buffer.get(), file_size, &footer,
-                             kBlockBasedTableMagicNumber);
-      RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
-      if (s.ok()) {
-        RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
-      }
-    }
+                           kBlockBasedTableMagicNumber, ioptions.stats);
   }
   if (!s.ok()) {
+    if (s.IsCorruption()) {
+      RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT);
+    }
     return s;
   }
   if (!IsSupportedFormatVersion(footer.format_version())) {
index e5ba3c6a6b8ab2ff4b5e63329a277a0735751db0..7e1c2817dd59cd15d0f44c38235e3d396c456c59 100644 (file)
@@ -475,10 +475,12 @@ std::string Footer::ToString() const {
   return result;
 }
 
-Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
-                          FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
-                          uint64_t file_size, Footer* footer,
-                          uint64_t enforce_table_magic_number) {
+static Status ReadFooterFromFileInternal(const IOOptions& opts,
+                                         RandomAccessFileReader* file,
+                                         FileSystem& fs,
+                                         FilePrefetchBuffer* prefetch_buffer,
+                                         uint64_t file_size, Footer* footer,
+                                         uint64_t enforce_table_magic_number) {
   if (file_size < Footer::kMinEncodedLength) {
     return Status::Corruption("file is too short (" +
                               std::to_string(file_size) +
@@ -516,6 +518,8 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
     }
   }
 
+  TEST_SYNC_POINT_CALLBACK("ReadFooterFromFileInternal:0", &footer_input);
+
   // Check that we actually read the whole footer from the file. It may be
   // that size isn't correct.
   if (footer_input.size() < Footer::kMinEncodedLength) {
@@ -543,6 +547,30 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
   return Status::OK();
 }
 
+Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
+                          FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
+                          uint64_t file_size, Footer* footer,
+                          uint64_t enforce_table_magic_number,
+                          Statistics* stats) {
+  Status s =
+      ReadFooterFromFileInternal(opts, file, fs, prefetch_buffer, file_size,
+                                 footer, enforce_table_magic_number);
+  if (s.IsCorruption() &&
+      CheckFSFeatureSupport(&fs, FSSupportedOps::kVerifyAndReconstructRead)) {
+    IOOptions new_opts = opts;
+    new_opts.verify_and_reconstruct_read = true;
+    footer->Reset();
+    s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer,
+                                   file_size, footer,
+                                   enforce_table_magic_number);
+    RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT);
+    if (s.ok()) {
+      RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
+    }
+  }
+  return s;
+}
+
 namespace {
 // Custom handling for the last byte of a block, to avoid invoking streaming
 // API to get an effective block checksum. This function is its own inverse
index cbd6d08fa3b6b8e836a682bc327363e105c08e8a..dac5d695be459cb4420103934c06b6897c3819ec 100644 (file)
@@ -186,6 +186,16 @@ class Footer {
   // Create empty. Populate using DecodeFrom.
   Footer() {}
 
+  void Reset() {
+    table_magic_number_ = kNullTableMagicNumber;
+    format_version_ = kInvalidFormatVersion;
+    base_context_checksum_ = 0;
+    metaindex_handle_ = BlockHandle::NullBlockHandle();
+    index_handle_ = BlockHandle::NullBlockHandle();
+    checksum_type_ = kInvalidChecksumType;
+    block_trailer_size_ = 0;
+  }
+
   // Deserialize a footer (populate fields) from `input` and check for various
   // corruptions. `input_offset` is the offset within the target file of
   // `input` buffer, which is needed for verifying format_version >= 6 footer.
@@ -304,7 +314,8 @@ class FooterBuilder {
 Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
                           FileSystem& fs, FilePrefetchBuffer* prefetch_buffer,
                           uint64_t file_size, Footer* footer,
-                          uint64_t enforce_table_magic_number = 0);
+                          uint64_t enforce_table_magic_number = 0,
+                          Statistics* stats = nullptr);
 
 // Computes a checksum using the given ChecksumType. Sometimes we need to
 // include one more input byte logically at the end but not part of the main
index fdd10ed4b2cb7968be500adcc7b3ca806fbb5c9a..3690c0363cfcff119732cb1c1810e98885e3cef3 100644 (file)
@@ -259,180 +259,228 @@ Status ReadTablePropertiesHelper(
     MemoryAllocator* memory_allocator) {
   assert(table_properties);
 
-  // If this is an external SST file ingested with write_global_seqno set to
-  // true, then we expect the checksum mismatch because checksum was written
-  // by SstFileWriter, but its global seqno in the properties block may have
-  // been changed during ingestion. For this reason, we initially read
-  // and process without checksum verification, then later try checksum
-  // verification so that if it fails, we can copy to a temporary buffer with
-  // global seqno set to its original value, i.e. 0, and attempt checksum
-  // verification again.
-  ReadOptions modified_ro = ro;
-  modified_ro.verify_checksums = false;
-  BlockContents block_contents;
-  BlockFetcher block_fetcher(file, prefetch_buffer, footer, modified_ro, handle,
-                             &block_contents, ioptions, false /* decompress */,
-                             false /*maybe_compressed*/, BlockType::kProperties,
-                             UncompressionDict::GetEmptyDict(),
-                             PersistentCacheOptions::kEmpty, memory_allocator);
-  Status s = block_fetcher.ReadBlockContents();
-  if (!s.ok()) {
-    return s;
-  }
-
-  // Unfortunately, Block::size() might not equal block_contents.data.size(),
-  // and Block hides block_contents
-  uint64_t block_size = block_contents.data.size();
-  Block properties_block(std::move(block_contents));
-  std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
-
-  std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
-  // All pre-defined properties of type uint64_t
-  std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
-      {TablePropertiesNames::kOriginalFileNumber,
-       &new_table_properties->orig_file_number},
-      {TablePropertiesNames::kDataSize, &new_table_properties->data_size},
-      {TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
-      {TablePropertiesNames::kIndexPartitions,
-       &new_table_properties->index_partitions},
-      {TablePropertiesNames::kTopLevelIndexSize,
-       &new_table_properties->top_level_index_size},
-      {TablePropertiesNames::kIndexKeyIsUserKey,
-       &new_table_properties->index_key_is_user_key},
-      {TablePropertiesNames::kIndexValueIsDeltaEncoded,
-       &new_table_properties->index_value_is_delta_encoded},
-      {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
-      {TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size},
-      {TablePropertiesNames::kRawValueSize,
-       &new_table_properties->raw_value_size},
-      {TablePropertiesNames::kNumDataBlocks,
-       &new_table_properties->num_data_blocks},
-      {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
-      {TablePropertiesNames::kNumFilterEntries,
-       &new_table_properties->num_filter_entries},
-      {TablePropertiesNames::kDeletedKeys,
-       &new_table_properties->num_deletions},
-      {TablePropertiesNames::kMergeOperands,
-       &new_table_properties->num_merge_operands},
-      {TablePropertiesNames::kNumRangeDeletions,
-       &new_table_properties->num_range_deletions},
-      {TablePropertiesNames::kFormatVersion,
-       &new_table_properties->format_version},
-      {TablePropertiesNames::kFixedKeyLen,
-       &new_table_properties->fixed_key_len},
-      {TablePropertiesNames::kColumnFamilyId,
-       &new_table_properties->column_family_id},
-      {TablePropertiesNames::kCreationTime,
-       &new_table_properties->creation_time},
-      {TablePropertiesNames::kOldestKeyTime,
-       &new_table_properties->oldest_key_time},
-      {TablePropertiesNames::kFileCreationTime,
-       &new_table_properties->file_creation_time},
-      {TablePropertiesNames::kSlowCompressionEstimatedDataSize,
-       &new_table_properties->slow_compression_estimated_data_size},
-      {TablePropertiesNames::kFastCompressionEstimatedDataSize,
-       &new_table_properties->fast_compression_estimated_data_size},
-      {TablePropertiesNames::kTailStartOffset,
-       &new_table_properties->tail_start_offset},
-      {TablePropertiesNames::kUserDefinedTimestampsPersisted,
-       &new_table_properties->user_defined_timestamps_persisted},
-  };
-
-  std::string last_key;
-  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
-    s = iter->status();
-    if (!s.ok()) {
-      break;
+  Status s;
+  bool retry = false;
+  while (true) {
+    BlockContents block_contents;
+    size_t len = handle.size() + footer.GetBlockTrailerSize();
+    // If this is an external SST file ingested with write_global_seqno set to
+    // true, then we expect the checksum mismatch because checksum was written
+    // by SstFileWriter, but its global seqno in the properties block may have
+    // been changed during ingestion. For this reason, we initially read
+    // and process without checksum verification, then later try checksum
+    // verification so that if it fails, we can copy to a temporary buffer with
+    // global seqno set to its original value, i.e. 0, and attempt checksum
+    // verification again.
+    if (!retry) {
+      ReadOptions modified_ro = ro;
+      modified_ro.verify_checksums = false;
+      BlockFetcher block_fetcher(
+          file, prefetch_buffer, footer, modified_ro, handle, &block_contents,
+          ioptions, false /* decompress */, false /*maybe_compressed*/,
+          BlockType::kProperties, UncompressionDict::GetEmptyDict(),
+          PersistentCacheOptions::kEmpty, memory_allocator);
+      s = block_fetcher.ReadBlockContents();
+      if (!s.ok()) {
+        return s;
+      }
+      assert(block_fetcher.GetBlockSizeWithTrailer() == len);
+      TEST_SYNC_POINT_CALLBACK("ReadTablePropertiesHelper:0",
+                               &block_contents.data);
+    } else {
+      assert(s.IsCorruption());
+      // If retrying, use a stronger file system read to check and correct
+      // data corruption
+      IOOptions opts;
+      if (PrepareIOFromReadOptions(ro, ioptions.clock, opts) !=
+          IOStatus::OK()) {
+        return s;
+      }
+      opts.verify_and_reconstruct_read = true;
+      std::unique_ptr<char[]> data(new char[len]);
+      Slice result;
+      IOStatus io_s =
+          file->Read(opts, handle.offset(), len, &result, data.get(), nullptr);
+      RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT);
+      if (!io_s.ok()) {
+        ROCKS_LOG_INFO(ioptions.info_log,
+                       "Reading properties block failed - %s",
+                       io_s.ToString().c_str());
+        // Return the original corruption error as that's more serious
+        return s;
+      }
+      if (result.size() < len) {
+        return Status::Corruption("Reading properties block failed - " +
+                                  std::to_string(result.size()) +
+                                  " bytes read");
+      }
+      RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
+      block_contents = BlockContents(std::move(data), handle.size());
     }
 
-    auto key = iter->key().ToString();
-    // properties block should be strictly sorted with no duplicate key.
-    if (!last_key.empty() &&
-        BytewiseComparator()->Compare(key, last_key) <= 0) {
-      s = Status::Corruption("properties unsorted");
-      break;
-    }
-    last_key = key;
+    // Unfortunately, Block::size() might not equal block_contents.data.size(),
+    // and Block hides block_contents
+    uint64_t block_size = block_contents.data.size();
+    Block properties_block(std::move(block_contents));
+    std::unique_ptr<MetaBlockIter> iter(properties_block.NewMetaIterator());
+
+    std::unique_ptr<TableProperties> new_table_properties{new TableProperties};
+    // All pre-defined properties of type uint64_t
+    std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
+        {TablePropertiesNames::kOriginalFileNumber,
+         &new_table_properties->orig_file_number},
+        {TablePropertiesNames::kDataSize, &new_table_properties->data_size},
+        {TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
+        {TablePropertiesNames::kIndexPartitions,
+         &new_table_properties->index_partitions},
+        {TablePropertiesNames::kTopLevelIndexSize,
+         &new_table_properties->top_level_index_size},
+        {TablePropertiesNames::kIndexKeyIsUserKey,
+         &new_table_properties->index_key_is_user_key},
+        {TablePropertiesNames::kIndexValueIsDeltaEncoded,
+         &new_table_properties->index_value_is_delta_encoded},
+        {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
+        {TablePropertiesNames::kRawKeySize,
+         &new_table_properties->raw_key_size},
+        {TablePropertiesNames::kRawValueSize,
+         &new_table_properties->raw_value_size},
+        {TablePropertiesNames::kNumDataBlocks,
+         &new_table_properties->num_data_blocks},
+        {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
+        {TablePropertiesNames::kNumFilterEntries,
+         &new_table_properties->num_filter_entries},
+        {TablePropertiesNames::kDeletedKeys,
+         &new_table_properties->num_deletions},
+        {TablePropertiesNames::kMergeOperands,
+         &new_table_properties->num_merge_operands},
+        {TablePropertiesNames::kNumRangeDeletions,
+         &new_table_properties->num_range_deletions},
+        {TablePropertiesNames::kFormatVersion,
+         &new_table_properties->format_version},
+        {TablePropertiesNames::kFixedKeyLen,
+         &new_table_properties->fixed_key_len},
+        {TablePropertiesNames::kColumnFamilyId,
+         &new_table_properties->column_family_id},
+        {TablePropertiesNames::kCreationTime,
+         &new_table_properties->creation_time},
+        {TablePropertiesNames::kOldestKeyTime,
+         &new_table_properties->oldest_key_time},
+        {TablePropertiesNames::kFileCreationTime,
+         &new_table_properties->file_creation_time},
+        {TablePropertiesNames::kSlowCompressionEstimatedDataSize,
+         &new_table_properties->slow_compression_estimated_data_size},
+        {TablePropertiesNames::kFastCompressionEstimatedDataSize,
+         &new_table_properties->fast_compression_estimated_data_size},
+        {TablePropertiesNames::kTailStartOffset,
+         &new_table_properties->tail_start_offset},
+        {TablePropertiesNames::kUserDefinedTimestampsPersisted,
+         &new_table_properties->user_defined_timestamps_persisted},
+    };
+
+    std::string last_key;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      s = iter->status();
+      if (!s.ok()) {
+        break;
+      }
 
-    auto raw_val = iter->value();
-    auto pos = predefined_uint64_properties.find(key);
+      auto key = iter->key().ToString();
+      // properties block should be strictly sorted with no duplicate key.
+      if (!last_key.empty() &&
+          BytewiseComparator()->Compare(key, last_key) <= 0) {
+        s = Status::Corruption("properties unsorted");
+        break;
+      }
+      last_key = key;
 
-    if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
-      new_table_properties->external_sst_file_global_seqno_offset =
-          handle.offset() + iter->ValueOffset();
-    }
+      auto raw_val = iter->value();
+      auto pos = predefined_uint64_properties.find(key);
 
-    if (pos != predefined_uint64_properties.end()) {
-      if (key == TablePropertiesNames::kDeletedKeys ||
-          key == TablePropertiesNames::kMergeOperands) {
-        // Insert in user-collected properties for API backwards compatibility
+      if (key == ExternalSstFilePropertyNames::kGlobalSeqno) {
+        new_table_properties->external_sst_file_global_seqno_offset =
+            handle.offset() + iter->ValueOffset();
+      }
+
+      if (pos != predefined_uint64_properties.end()) {
+        if (key == TablePropertiesNames::kDeletedKeys ||
+            key == TablePropertiesNames::kMergeOperands) {
+          // Insert in user-collected properties for API backwards compatibility
+          new_table_properties->user_collected_properties.insert(
+              {key, raw_val.ToString()});
+        }
+        // handle predefined rocksdb properties
+        uint64_t val;
+        if (!GetVarint64(&raw_val, &val)) {
+          // skip malformed value
+          auto error_msg =
+              "Detect malformed value in properties meta-block:"
+              "\tkey: " +
+              key + "\tval: " + raw_val.ToString();
+          ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
+          continue;
+        }
+        *(pos->second) = val;
+      } else if (key == TablePropertiesNames::kDbId) {
+        new_table_properties->db_id = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kDbSessionId) {
+        new_table_properties->db_session_id = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kDbHostId) {
+        new_table_properties->db_host_id = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kFilterPolicy) {
+        new_table_properties->filter_policy_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kColumnFamilyName) {
+        new_table_properties->column_family_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kComparator) {
+        new_table_properties->comparator_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kMergeOperator) {
+        new_table_properties->merge_operator_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kPrefixExtractorName) {
+        new_table_properties->prefix_extractor_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kPropertyCollectors) {
+        new_table_properties->property_collectors_names = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kCompression) {
+        new_table_properties->compression_name = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kCompressionOptions) {
+        new_table_properties->compression_options = raw_val.ToString();
+      } else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
+        new_table_properties->seqno_to_time_mapping = raw_val.ToString();
+      } else {
+        // handle user-collected properties
         new_table_properties->user_collected_properties.insert(
             {key, raw_val.ToString()});
       }
-      // handle predefined rocksdb properties
-      uint64_t val;
-      if (!GetVarint64(&raw_val, &val)) {
-        // skip malformed value
-        auto error_msg =
-            "Detect malformed value in properties meta-block:"
-            "\tkey: " +
-            key + "\tval: " + raw_val.ToString();
-        ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str());
-        continue;
-      }
-      *(pos->second) = val;
-    } else if (key == TablePropertiesNames::kDbId) {
-      new_table_properties->db_id = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kDbSessionId) {
-      new_table_properties->db_session_id = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kDbHostId) {
-      new_table_properties->db_host_id = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kFilterPolicy) {
-      new_table_properties->filter_policy_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kColumnFamilyName) {
-      new_table_properties->column_family_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kComparator) {
-      new_table_properties->comparator_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kMergeOperator) {
-      new_table_properties->merge_operator_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kPrefixExtractorName) {
-      new_table_properties->prefix_extractor_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kPropertyCollectors) {
-      new_table_properties->property_collectors_names = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kCompression) {
-      new_table_properties->compression_name = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kCompressionOptions) {
-      new_table_properties->compression_options = raw_val.ToString();
-    } else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) {
-      new_table_properties->seqno_to_time_mapping = raw_val.ToString();
-    } else {
-      // handle user-collected properties
-      new_table_properties->user_collected_properties.insert(
-          {key, raw_val.ToString()});
     }
-  }
 
-  // Modified version of BlockFetcher checksum verification
-  // (See write_global_seqno comment above)
-  if (s.ok() && footer.GetBlockTrailerSize() > 0) {
-    s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
-                            file->file_name(), handle.offset());
-    if (s.IsCorruption()) {
-      if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
-        std::string tmp_buf(properties_block.data(),
-                            block_fetcher.GetBlockSizeWithTrailer());
-        uint64_t global_seqno_offset =
-            new_table_properties->external_sst_file_global_seqno_offset -
-            handle.offset();
-        EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
-        s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
-                                file->file_name(), handle.offset());
+    // Modified version of BlockFetcher checksum verification
+    // (See write_global_seqno comment above)
+    if (s.ok() && footer.GetBlockTrailerSize() > 0) {
+      s = VerifyBlockChecksum(footer, properties_block.data(), block_size,
+                              file->file_name(), handle.offset());
+      if (s.IsCorruption()) {
+        if (new_table_properties->external_sst_file_global_seqno_offset != 0) {
+          std::string tmp_buf(properties_block.data(), len);
+          uint64_t global_seqno_offset =
+              new_table_properties->external_sst_file_global_seqno_offset -
+              handle.offset();
+          EncodeFixed64(&tmp_buf[static_cast<size_t>(global_seqno_offset)], 0);
+          s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size,
+                                  file->file_name(), handle.offset());
+        }
       }
     }
-  }
 
-  if (s.ok()) {
-    *table_properties = std::move(new_table_properties);
+    // If we detected a corruption and the file system supports verification
+    // and reconstruction, retry the read
+    if (s.IsCorruption() && !retry &&
+        CheckFSFeatureSupport(ioptions.fs.get(),
+                              FSSupportedOps::kVerifyAndReconstructRead)) {
+      retry = true;
+    } else {
+      if (s.ok()) {
+        *table_properties = std::move(new_table_properties);
+      }
+      break;
+    }
   }
 
   return s;