]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Added bytes XOR merge operator
authorPooya Shareghi <shareghi@gmail.com>
Tue, 6 Mar 2018 18:20:52 +0000 (10:20 -0800)
committerSagar Vemuri <svemuri@fb.com>
Tue, 6 Mar 2018 19:16:28 +0000 (11:16 -0800)
Summary:
Closes https://github.com/facebook/rocksdb/pull/575

I fixed the merge conflicts etc.
Closes https://github.com/facebook/rocksdb/pull/3065

Differential Revision: D7128233

Pulled By: sagar0

fbshipit-source-id: 2c23a48c9f0432c290b0cd16a12fb691bb37820c

CMakeLists.txt
TARGETS
src.mk
tools/db_bench_tool.cc
utilities/merge_operators.h
utilities/merge_operators/bytesxor.cc [new file with mode: 0644]
utilities/merge_operators/bytesxor.h [new file with mode: 0644]

index 3c0f67f5e3b9f1577666ed01190ec867fe6cea9f..c4b4116bd5f6046499b98fc80c40496630351279 100644 (file)
@@ -586,6 +586,7 @@ set(SOURCES
         utilities/leveldb_options/leveldb_options.cc
         utilities/lua/rocks_lua_compaction_filter.cc
         utilities/memory/memory_util.cc
+        utilities/merge_operators/bytesxor.cc
         utilities/merge_operators/max.cc
         utilities/merge_operators/put.cc
         utilities/merge_operators/string_append/stringappend.cc
diff --git a/TARGETS b/TARGETS
index 2846bb826e39781aaa19162f6ad3cf90abc1c9b8..ca0ab48d0f50441c2827a487195da420759936ee 100644 (file)
--- a/TARGETS
+++ b/TARGETS
@@ -242,6 +242,7 @@ cpp_library(
         "utilities/leveldb_options/leveldb_options.cc",
         "utilities/lua/rocks_lua_compaction_filter.cc",
         "utilities/memory/memory_util.cc",
+        "utilities/merge_operators/bytesxor.cc",
         "utilities/merge_operators/max.cc",
         "utilities/merge_operators/put.cc",
         "utilities/merge_operators/string_append/stringappend.cc",
diff --git a/src.mk b/src.mk
index 17bf94d76a0b29e8e3c4ff9aec81f8a11326cc00..c2ad87c54a6908aed18181e97a0f38977e0b1ce2 100644 (file)
--- a/src.mk
+++ b/src.mk
@@ -182,6 +182,7 @@ LIB_SOURCES =                                                   \
   utilities/merge_operators/string_append/stringappend.cc       \
   utilities/merge_operators/string_append/stringappend2.cc      \
   utilities/merge_operators/uint64add.cc                        \
+  utilities/merge_operators/bytesxor.cc                         \
   utilities/option_change_migration/option_change_migration.cc  \
   utilities/options/options_util.cc                             \
   utilities/persistent_cache/block_cache_tier.cc                \
index 064c5ccea6d42d6b336773e383ad125606be9570..15004dbe770c7850c9bef81d34ba7466b7c22fcb 100644 (file)
@@ -70,6 +70,7 @@
 #include "util/xxhash.h"
 #include "utilities/blob_db/blob_db.h"
 #include "utilities/merge_operators.h"
+#include "utilities/merge_operators/bytesxor.h"
 #include "utilities/persistent_cache/block_cache_tier.h"
 
 #ifdef OS_WIN
@@ -107,6 +108,7 @@ DEFINE_string(
     "readwhilemerging,"
     "readrandomwriterandom,"
     "updaterandom,"
+    "xorupdaterandom,"
     "randomwithverify,"
     "fill100K,"
     "crc32c,"
@@ -151,6 +153,8 @@ DEFINE_string(
     "\tprefixscanrandom      -- prefix scan N times in random order\n"
     "\tupdaterandom  -- N threads doing read-modify-write for random "
     "keys\n"
+    "\txorupdaterandom  -- N threads doing read-XOR-write for "
+    "random keys\n"
     "\tappendrandom  -- N threads doing read-modify-write with "
     "growing values\n"
     "\tmergerandom   -- same as updaterandom/appendrandom using merge"
@@ -2512,6 +2516,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
         method = &Benchmark::ReadRandomMergeRandom;
       } else if (name == "updaterandom") {
         method = &Benchmark::UpdateRandom;
+      } else if (name == "xorupdaterandom") {
+        method = &Benchmark::XORUpdateRandom;
       } else if (name == "appendrandom") {
         method = &Benchmark::AppendRandom;
       } else if (name == "mergerandom") {
@@ -4720,6 +4726,58 @@ void VerifyDBFromDB(std::string& truth_db_name) {
     thread->stats.AddMessage(msg);
   }
 
+  // Read-XOR-write for random keys. Xors the existing value with a randomly
+  // generated value, and stores the result. Assuming A in the array of bytes
+  // representing the existing value, we generate an array B of the same size,
+  // then compute C = A^B as C[i]=A[i]^B[i], and store C
+  void XORUpdateRandom(ThreadState* thread) {
+    ReadOptions options(FLAGS_verify_checksum, true);
+    RandomGenerator gen;
+    std::string existing_value;
+    int64_t found = 0;
+    Duration duration(FLAGS_duration, readwrites_);
+
+    BytesXOROperator xor_operator;
+
+    std::unique_ptr<const char[]> key_guard;
+    Slice key = AllocateKey(&key_guard);
+    // the number of iterations is the larger of read_ or write_
+    while (!duration.Done(1)) {
+      DB* db = SelectDB(thread);
+      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
+
+      auto status = db->Get(options, key, &existing_value);
+      if (status.ok()) {
+        ++found;
+      } else if (!status.IsNotFound()) {
+        fprintf(stderr, "Get returned an error: %s\n",
+                status.ToString().c_str());
+        exit(1);
+      }
+
+      Slice value = gen.Generate(value_size_);
+      std::string new_value;
+
+      if (status.ok()) {
+        Slice existing_value_slice = Slice(existing_value);
+        xor_operator.XOR(&existing_value_slice, value, &new_value);
+      } else {
+        xor_operator.XOR(nullptr, value, &new_value);
+      }
+
+      Status s = db->Put(write_options_, key, Slice(new_value));
+      if (!s.ok()) {
+        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
+        exit(1);
+      }
+      thread->stats.FinishedOps(nullptr, db, 1);
+    }
+    char msg[100];
+    snprintf(msg, sizeof(msg),
+             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
+    thread->stats.AddMessage(msg);
+  }
+
   // Read-modify-write for random keys.
   // Each operation causes the key grow by value_size (simulating an append).
   // Generally used for benchmarking against merges of similar type
index 602a4d01aa3f5344c4382b5ffa037195086e1aac..40a19cf862a0168e337cb4a77b201ce1edbb2044 100644 (file)
@@ -3,13 +3,13 @@
 //  COPYING file in the root directory) and Apache 2.0 License
 //  (found in the LICENSE.Apache file in the root directory).
 //
-#ifndef MERGE_OPERATORS_H
-#define MERGE_OPERATORS_H
+#pragma once
+#include "rocksdb/merge_operator.h"
 
-#include <memory>
 #include <stdio.h>
 
-#include "rocksdb/merge_operator.h"
+#include <memory>
+#include <string>
 
 namespace rocksdb {
 
@@ -21,6 +21,7 @@ class MergeOperators {
   static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
   static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
   static std::shared_ptr<MergeOperator> CreateMaxOperator();
+  static std::shared_ptr<MergeOperator> CreateBytesXOROperator();
 
   // Will return a different merge operator depending on the string.
   // TODO: Hook the "name" up to the actual Name() of the MergeOperators?
@@ -38,14 +39,13 @@ class MergeOperators {
       return CreateStringAppendTESTOperator();
     } else if (name == "max") {
       return CreateMaxOperator();
+    } else if (name == "bytesxor") {
+      return CreateBytesXOROperator();
     } else {
       // Empty or unknown, just return nullptr
       return nullptr;
     }
   }
-
 };
 
-} // namespace rocksdb
-
-#endif
+}  // namespace rocksdb
diff --git a/utilities/merge_operators/bytesxor.cc b/utilities/merge_operators/bytesxor.cc
new file mode 100644 (file)
index 0000000..cf9d976
--- /dev/null
@@ -0,0 +1,59 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include <algorithm>
+#include <string>
+
+#include "utilities/merge_operators/bytesxor.h"
+
+namespace rocksdb {
+
+std::shared_ptr<MergeOperator> MergeOperators::CreateBytesXOROperator() {
+  return std::make_shared<BytesXOROperator>();
+}
+
+bool BytesXOROperator::Merge(const Slice& key,
+                            const Slice* existing_value,
+                            const Slice& value,
+                            std::string* new_value,
+                            Logger* logger) const {
+  XOR(existing_value, value, new_value);
+  return true;
+}
+
+void BytesXOROperator::XOR(const Slice* existing_value,
+          const Slice& value, std::string* new_value) const {
+  if (!existing_value) {
+    new_value->clear();
+    new_value->assign(value.data(), value.size());
+    return;
+  }
+
+  size_t min_size = std::min(existing_value->size(), value.size());
+  size_t max_size = std::max(existing_value->size(), value.size());
+
+  new_value->clear();
+  new_value->reserve(max_size);
+
+  const char* existing_value_data = existing_value->data();
+  const char* value_data = value.data();
+
+  for (size_t i = 0; i < min_size; i++) {
+    new_value->push_back(existing_value_data[i] ^ value_data[i]);
+  }
+
+  if (existing_value->size() == max_size) {
+    for (size_t i = min_size; i < max_size; i++) {
+      new_value->push_back(existing_value_data[i]);
+    }
+  } else {
+         assert(value.size() == max_size);
+    for (size_t i = min_size; i < max_size; i++) {
+      new_value->push_back(value_data[i]);
+    }
+  }
+}
+
+}  // namespace rocksdb
diff --git a/utilities/merge_operators/bytesxor.h b/utilities/merge_operators/bytesxor.h
new file mode 100644 (file)
index 0000000..1562ca8
--- /dev/null
@@ -0,0 +1,42 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef UTILITIES_MERGE_OPERATORS_BYTESXOR_H_
+#define UTILITIES_MERGE_OPERATORS_BYTESXOR_H_
+
+#include <algorithm>
+#include <memory>
+#include <string>
+#include "rocksdb/env.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/slice.h"
+#include "util/coding.h"
+#include "utilities/merge_operators.h"
+
+namespace rocksdb {
+
+// A 'model' merge operator that XORs two (same sized) array of bytes.
+// Implemented as an AssociativeMergeOperator for simplicity and example.
+class BytesXOROperator : public AssociativeMergeOperator {
+ public:
+  // XORs the two array of bytes one byte at a time and stores the result
+  // in new_value. len is the number of xored bytes, and the length of new_value
+  virtual bool Merge(const Slice& key,
+                     const Slice* existing_value,
+                     const Slice& value,
+                     std::string* new_value,
+                     Logger* logger) const override;
+
+  virtual const char* Name() const override {
+    return "BytesXOR";
+  }
+
+  void XOR(const Slice* existing_value, const Slice& value,
+          std::string* new_value) const;
+};
+
+}  // namespace rocksdb
+
+#endif  // UTILITIES_MERGE_OPERATORS_BYTESXOR_H_