* Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
* Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
+* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block.
## 7.10.0 (01/23/2023)
### Behavior changes
size_t uncompressed_size = 0;
int remaining = 0;
+ const char* input = header + header_size;
do {
- remaining = uncompress_->Uncompress(header + header_size, length,
- uncompressed_buffer_.get(),
- &uncompressed_size);
+ remaining = uncompress_->Uncompress(
+ input, length, uncompressed_buffer_.get(), &uncompressed_size);
+ input = nullptr;
if (remaining < 0) {
buffer_.clear();
return kBadRecord;
uncompressed_record_.clear();
size_t uncompressed_size = 0;
int remaining = 0;
+ const char* input = header + header_size;
do {
- remaining = uncompress_->Uncompress(header + header_size, length,
- uncompressed_buffer_.get(),
- &uncompressed_size);
+ remaining = uncompress_->Uncompress(
+ input, length, uncompressed_buffer_.get(), &uncompressed_size);
+ input = nullptr;
if (remaining < 0) {
buffer_.clear();
*fragment_type_or_err = kBadRecord;
ASSERT_EQ("EOF", Read());
}
+TEST_P(CompressionLogTest, AlignedFragmentation) {
+ CompressionType compression_type = std::get<2>(GetParam());
+ if (!StreamingCompressionTypeSupported(compression_type)) {
+ ROCKSDB_GTEST_SKIP("Test requires support for compression type");
+ return;
+ }
+ ASSERT_OK(SetupTestEnv());
+ Random rnd(301);
+ int num_filler_records = 0;
+ // Keep writing small records until the next record will be aligned at the
+ // beginning of the block.
+ while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) {
+ char entry = 'a';
+ ASSERT_OK(writer_->AddRecord(Slice(&entry, 1)));
+ num_filler_records++;
+ }
+ const std::vector<std::string> wal_entries = {
+ rnd.RandomBinaryString(3 * kBlockSize),
+ };
+ for (const std::string& wal_entry : wal_entries) {
+ Write(wal_entry);
+ }
+
+ for (int i = 0; i < num_filler_records; ++i) {
+ ASSERT_EQ("a", Read());
+ }
+ for (const std::string& wal_entry : wal_entries) {
+ ASSERT_EQ(wal_entry, Read());
+ }
+ ASSERT_EQ("EOF", Read());
+}
+
INSTANTIATE_TEST_CASE_P(
Compression, CompressionLogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
for (int i = 0; i < (int)compressed_buffers.size(); i++) {
// Call uncompress till either the entire input is consumed or the output
// buffer size is equal to the allocated output buffer size.
+ const char* input = compressed_buffers[i].c_str();
do {
- ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(),
- compressed_buffers[i].size(),
+ ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(),
uncompressed_output_buffer, &output_pos);
+ input = nullptr;
if (output_pos > 0) {
std::string uncompressed_fragment;
uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
char* output, size_t* output_pos) {
- assert(input != nullptr && output != nullptr && output_pos != nullptr);
+ assert(output != nullptr && output_pos != nullptr);
*output_pos = 0;
// Don't need to uncompress an empty input
if (input_size == 0) {
return 0;
}
#ifdef ZSTD_STREAMING
- if (input_buffer_.src != input) {
+ if (input) {
// New input
input_buffer_ = {input, input_size, /*pos=*/0};
}
compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {}
virtual ~StreamingUncompress() = default;
- // uncompress should be called again with the same input if output_size is
- // equal to max_output_len or with the next input fragment.
+ // Uncompress can be called repeatedly to progressively process the same
+ // input buffer, or can be called with a new input buffer. When the input
+ // buffer is not fully consumed, the return value is > 0 or output_size
+ // == max_output_len. When calling uncompress to continue processing the
+ // same input buffer, the input argument should be nullptr.
// Parameters:
// input - buffer to uncompress
// input_size - size of input buffer