db/version_set.cc
db/wal_edit.cc
db/wal_manager.cc
+ db/wide/wide_column_serialization.cc
db/write_batch.cc
db/write_batch_base.cc
db/write_controller.cc
db/version_set_test.cc
db/wal_manager_test.cc
db/wal_edit_test.cc
+ db/wide/wide_column_serialization_test.cc
db/write_batch_test.cc
db/write_callback_test.cc
db/write_controller_test.cc
cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
+
+wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_test.o $(TEST_LIBRARY) $(LIBRARY)
+ $(AM_LINK)
+
#-------------------------------------------------
# make install related stuff
PREFIX ?= /usr/local
"db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
+ "db/wide/wide_column_serialization.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
"db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
+ "db/wide/wide_column_serialization.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
extra_compiler_flags=[])
+cpp_unittest_wrapper(name="wide_column_serialization_test",
+ srcs=["db/wide/wide_column_serialization_test.cc"],
+ deps=[":rocksdb_test_lib"],
+ extra_compiler_flags=[])
+
+
cpp_unittest_wrapper(name="work_queue_test",
srcs=["util/work_queue_test.cc"],
deps=[":rocksdb_test_lib"],
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/wide/wide_column_serialization.h"
+
+#include <algorithm>
+#include <cassert>
+#include <limits>
+
+#include "rocksdb/slice.h"
+#include "util/autovector.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+Status WideColumnSerialization::Serialize(const WideColumns& columns,
+ std::string& output) {
+ // Column names should be strictly ascending
+ assert(std::adjacent_find(columns.cbegin(), columns.cend(),
+ [](const WideColumn& lhs, const WideColumn& rhs) {
+ return lhs.name().compare(rhs.name()) > 0;
+ }) == columns.cend());
+
+ if (columns.size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Too many wide columns");
+ }
+
+ PutVarint32(&output, kCurrentVersion);
+
+ PutVarint32(&output, static_cast<uint32_t>(columns.size()));
+
+ for (const auto& column : columns) {
+ const Slice& name = column.name();
+ if (name.size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Wide column name too long");
+ }
+
+ const Slice& value = column.value();
+ if (value.size() >
+ static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
+ return Status::InvalidArgument("Wide column value too long");
+ }
+
+ PutLengthPrefixedSlice(&output, name);
+ PutVarint32(&output, static_cast<uint32_t>(value.size()));
+ }
+
+ for (const auto& column : columns) {
+ const Slice& value = column.value();
+
+ output.append(value.data(), value.size());
+ }
+
+ return Status::OK();
+}
+
+Status WideColumnSerialization::Deserialize(Slice& input,
+ WideColumns& columns) {
+ assert(columns.empty());
+
+ uint32_t version = 0;
+ if (!GetVarint32(&input, &version)) {
+ return Status::Corruption("Error decoding wide column version");
+ }
+
+ if (version > kCurrentVersion) {
+ return Status::NotSupported("Unsupported wide column version");
+ }
+
+ uint32_t num_columns = 0;
+ if (!GetVarint32(&input, &num_columns)) {
+ return Status::Corruption("Error decoding number of wide columns");
+ }
+
+ if (!num_columns) {
+ return Status::OK();
+ }
+
+ columns.reserve(num_columns);
+
+ autovector<uint32_t, 16> column_value_sizes;
+ column_value_sizes.reserve(num_columns);
+
+ for (uint32_t i = 0; i < num_columns; ++i) {
+ Slice name;
+ if (!GetLengthPrefixedSlice(&input, &name)) {
+ return Status::Corruption("Error decoding wide column name");
+ }
+
+ if (!columns.empty() && columns.back().name().compare(name) >= 0) {
+ return Status::Corruption("Wide columns out of order");
+ }
+
+ columns.emplace_back(name, Slice());
+
+ uint32_t value_size = 0;
+ if (!GetVarint32(&input, &value_size)) {
+ return Status::Corruption("Error decoding wide column value size");
+ }
+
+ column_value_sizes.emplace_back(value_size);
+ }
+
+ const Slice data(input);
+ size_t pos = 0;
+
+ for (uint32_t i = 0; i < num_columns; ++i) {
+ const uint32_t value_size = column_value_sizes[i];
+
+ if (pos + value_size > data.size()) {
+ return Status::Corruption("Error decoding wide column value payload");
+ }
+
+ columns[i].value() = Slice(data.data() + pos, value_size);
+
+ pos += value_size;
+ }
+
+ return Status::OK();
+}
+
+WideColumns::const_iterator WideColumnSerialization::Find(
+ const WideColumns& columns, const Slice& column_name) {
+ const auto it =
+ std::lower_bound(columns.cbegin(), columns.cend(), column_name,
+ [](const WideColumn& lhs, const Slice& rhs) {
+ return lhs.name().compare(rhs) < 0;
+ });
+
+ if (it == columns.cend() || it->name() != column_name) {
+ return columns.cend();
+ }
+
+ return it;
+}
+
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/status.h"
+#include "rocksdb/wide_columns.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Slice;
+
+// Wide-column serialization/deserialization primitives.
+//
+// The two main parts of the layout are 1) a sorted index containing the column
+// names and column value sizes and 2) the column values themselves. Keeping the
+// index and the values separate will enable selectively reading column values
+// down the line. Note that currently the index has to be fully parsed in order
+// to find out the offset of each column value.
+//
+// Legend: cn = column name, cv = column value, cns = column name size, cvs =
+// column value size.
+//
+// +----------+--------------+----------+-------+----------+---...
+// | version | # of columns | cns 1 | cn 1 | cvs 1 |
+// +----------+--------------+------------------+--------- +---...
+// | varint32 | varint32 | varint32 | bytes | varint32 |
+// +----------+--------------+----------+-------+----------+---...
+//
+// ... continued ...
+//
+// ...---+----------+-------+----------+-------+---...---+-------+
+// | cns N | cn N | cvs N | cv 1 | | cv N |
+// ...---+----------+-------+----------+-------+---...---+-------+
+// | varint32 | bytes | varint32 | bytes | | bytes |
+// ...---+----------+-------+----------+-------+---...---+-------+
+
+class WideColumnSerialization {
+ public:
+ static Status Serialize(const WideColumns& columns, std::string& output);
+ static Status Deserialize(Slice& input, WideColumns& columns);
+
+ static WideColumns::const_iterator Find(const WideColumns& columns,
+ const Slice& column_name);
+
+ static constexpr uint32_t kCurrentVersion = 1;
+};
+
+} // namespace ROCKSDB_NAMESPACE
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "db/wide/wide_column_serialization.h"
+
+#include "test_util/testharness.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+TEST(WideColumnSerializationTest, Construct) {
+ constexpr char foo[] = "foo";
+ constexpr char bar[] = "bar";
+
+ const std::string foo_str(foo);
+ const std::string bar_str(bar);
+
+ const Slice foo_slice(foo_str);
+ const Slice bar_slice(bar_str);
+
+ {
+ WideColumn column(foo, bar);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo_str, bar);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo_slice, bar);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar);
+ }
+
+ {
+ WideColumn column(foo, bar_str);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo_str, bar_str);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo_slice, bar_str);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar_str);
+ }
+
+ {
+ WideColumn column(foo, bar_slice);
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ WideColumn column(foo_str, bar_slice);
+ ASSERT_EQ(column.name(), foo_str);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ WideColumn column(foo_slice, bar_slice);
+ ASSERT_EQ(column.name(), foo_slice);
+ ASSERT_EQ(column.value(), bar_slice);
+ }
+
+ {
+ constexpr char foo_name[] = "foo_name";
+ constexpr char bar_value[] = "bar_value";
+
+ WideColumn column(std::piecewise_construct,
+ std::forward_as_tuple(foo_name, sizeof(foo) - 1),
+ std::forward_as_tuple(bar_value, sizeof(bar) - 1));
+ ASSERT_EQ(column.name(), foo);
+ ASSERT_EQ(column.value(), bar);
+ }
+}
+
+TEST(WideColumnSerializationTest, SerializeDeserialize) {
+ WideColumns columns{{"foo", "bar"}, {"hello", "world"}};
+ std::string output;
+
+ ASSERT_OK(WideColumnSerialization::Serialize(columns, output));
+
+ Slice input(output);
+ WideColumns deserialized_columns;
+
+ ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns));
+ ASSERT_EQ(columns, deserialized_columns);
+
+ {
+ const auto it = WideColumnSerialization::Find(deserialized_columns, "foo");
+ ASSERT_NE(it, deserialized_columns.cend());
+ ASSERT_EQ(*it, deserialized_columns.front());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "hello");
+ ASSERT_NE(it, deserialized_columns.cend());
+ ASSERT_EQ(*it, deserialized_columns.back());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "fubar");
+ ASSERT_EQ(it, deserialized_columns.cend());
+ }
+
+ {
+ const auto it =
+ WideColumnSerialization::Find(deserialized_columns, "snafu");
+ ASSERT_EQ(it, deserialized_columns.cend());
+ }
+}
+
+TEST(WideColumnSerializationTest, DeserializeVersionError) {
+ // Can't decode version
+
+ std::string buf;
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "version"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeUnsupportedVersion) {
+ // Unsupported version
+ constexpr uint32_t future_version = 1000;
+
+ std::string buf;
+ PutVarint32(&buf, future_version);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsNotSupported());
+ ASSERT_TRUE(std::strstr(s.getState(), "version"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeNumberOfColumnsError) {
+ // Can't decode number of columns
+
+ std::string buf;
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "number"));
+}
+
+TEST(WideColumnSerializationTest, DeserializeColumnsError) {
+ std::string buf;
+
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ constexpr uint32_t num_columns = 2;
+ PutVarint32(&buf, num_columns);
+
+ // Can't decode the first column name
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "name"));
+ }
+
+ constexpr char first_column_name[] = "foo";
+ PutLengthPrefixedSlice(&buf, first_column_name);
+
+ // Can't decode the size of the first column value
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "value size"));
+ }
+
+ constexpr uint32_t first_value_size = 16;
+ PutVarint32(&buf, first_value_size);
+
+ // Can't decode the second column name
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "name"));
+ }
+
+ constexpr char second_column_name[] = "hello";
+ PutLengthPrefixedSlice(&buf, second_column_name);
+
+ // Can't decode the size of the second column value
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "value size"));
+ }
+
+ constexpr uint32_t second_value_size = 64;
+ PutVarint32(&buf, second_value_size);
+
+ // Can't decode the payload of the first column
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "payload"));
+ }
+
+ buf.append(first_value_size, '0');
+
+ // Can't decode the payload of the second column
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "payload"));
+ }
+
+ buf.append(second_value_size, 'x');
+
+ // Success
+ {
+ Slice input(buf);
+ WideColumns columns;
+
+ ASSERT_OK(WideColumnSerialization::Deserialize(input, columns));
+ }
+}
+
+TEST(WideColumnSerializationTest, DeserializeColumnsOutOfOrder) {
+ std::string buf;
+
+ PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
+
+ constexpr uint32_t num_columns = 2;
+ PutVarint32(&buf, num_columns);
+
+ constexpr char first_column_name[] = "b";
+ PutLengthPrefixedSlice(&buf, first_column_name);
+
+ constexpr uint32_t first_value_size = 16;
+ PutVarint32(&buf, first_value_size);
+
+ constexpr char second_column_name[] = "a";
+ PutLengthPrefixedSlice(&buf, second_column_name);
+
+ Slice input(buf);
+ WideColumns columns;
+
+ const Status s = WideColumnSerialization::Deserialize(input, columns);
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_TRUE(std::strstr(s.getState(), "order"));
+}
+
+} // namespace ROCKSDB_NAMESPACE
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
--- /dev/null
+// Copyright (c) Meta Platforms, Inc. and affiliates.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <tuple>
+#include <utility>
+#include <vector>
+
+#include "rocksdb/rocksdb_namespace.h"
+#include "rocksdb/slice.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Class representing a wide column, which is defined as a pair of column name
+// and column value.
+class WideColumn {
+ public:
+ WideColumn() = default;
+
+ // Initializes a WideColumn object by forwarding the name and value
+ // arguments to the corresponding member Slices. This makes it possible to
+ // construct a WideColumn using combinations of const char*, const
+ // std::string&, const Slice& etc., for example:
+ //
+ // constexpr char foo[] = "foo";
+ // const std::string bar("bar");
+ // WideColumn column(foo, bar);
+ template <typename N, typename V>
+ WideColumn(N&& name, V&& value)
+ : name_(std::forward<N>(name)), value_(std::forward<V>(value)) {}
+
+ // Initializes a WideColumn object by forwarding the elements of
+ // name_tuple and value_tuple to the constructors of the corresponding member
+ // Slices. This makes it possible to initialize the Slices using the Slice
+ // constructors that take more than one argument, for example:
+ //
+ // constexpr char foo_name[] = "foo_name";
+ // constexpr char bar_value[] = "bar_value";
+ // WideColumn column(std::piecewise_construct,
+ // std::forward_as_tuple(foo_name, 3),
+ // std::forward_as_tuple(bar_value, 3));
+ template <typename NTuple, typename VTuple>
+ WideColumn(std::piecewise_construct_t, NTuple&& name_tuple,
+ VTuple&& value_tuple)
+ : name_(std::make_from_tuple<Slice>(std::forward<NTuple>(name_tuple))),
+ value_(std::make_from_tuple<Slice>(std::forward<VTuple>(value_tuple))) {
+ }
+
+ const Slice& name() const { return name_; }
+ const Slice& value() const { return value_; }
+
+ Slice& name() { return name_; }
+ Slice& value() { return value_; }
+
+ private:
+ Slice name_;
+ Slice value_;
+};
+
+// Note: column names and values are compared bytewise.
+inline bool operator==(const WideColumn& lhs, const WideColumn& rhs) {
+ return lhs.name() == rhs.name() && lhs.value() == rhs.value();
+}
+
+inline bool operator!=(const WideColumn& lhs, const WideColumn& rhs) {
+ return !(lhs == rhs);
+}
+
+using WideColumns = std::vector<WideColumn>;
+
+} // namespace ROCKSDB_NAMESPACE
db/version_set.cc \
db/wal_edit.cc \
db/wal_manager.cc \
+ db/wide/wide_column_serialization.cc \
db/write_batch.cc \
db/write_batch_base.cc \
db/write_controller.cc \
db/version_edit_test.cc \
db/version_set_test.cc \
db/wal_manager_test.cc \
+ db/wide/wide_column_serialization_test.cc \
db/write_batch_test.cc \
db/write_callback_test.cc \
db/write_controller_test.cc \
// full-fledged generic container.
//
// Currently we don't support:
-// * reserve()/shrink_to_fit()
+// * shrink_to_fit()
// If used correctly, in most cases, people should not touch the
// underlying vector at all.
// * random insert()/erase(), please only use push_back()/pop_back().
bool empty() const { return size() == 0; }
+ size_type capacity() const { return kSize + vect_.capacity(); }
+
+ void reserve(size_t cap) {
+ if (cap > kSize) {
+ vect_.reserve(cap - kSize);
+ }
+
+ assert(cap <= capacity());
+ }
+
const_reference operator[](size_type n) const {
assert(n < size());
if (n < kSize) {