]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Add option to use MultiGet in db_stress (#5264)
authoranand76 <anand76@devvm1373.frc2.facebook.com>
Thu, 2 May 2019 06:04:03 +0000 (23:04 -0700)
committeranand76 <anand76@devvm1373.frc2.facebook.com>
Fri, 24 May 2019 20:18:02 +0000 (13:18 -0700)
Summary:
The new option will pick a batch size randomly in the range 1-64. It will then space the keys in the batch by random intervals.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5264

Differential Revision: D15175522

Pulled By: anand1976

fbshipit-source-id: c16baa69d0f1ff4cf53c55c813ddd82c8aeb58fc

tools/db_crashtest.py
tools/db_stress.cc

index 6c7fbabbf111859649098f0a1f18b97ba16cad93..62f72f2b5eb66e632e23fad743470ad5b9f3dda2 100644 (file)
@@ -65,6 +65,7 @@ default_params = {
     "writepercent": 35,
     "format_version": lambda: random.randint(2, 4),
     "index_block_restart_interval": lambda: random.choice(range(1, 16)),
+    "use_multiget" : lambda: random.randint(0, 1),
 }
 
 _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
index 2ecd2aa6d13bc8a12a04ba8920e67dc6b2c099c7..97755fe962a42edc2e98c06a4f201133ed24fa66 100644 (file)
@@ -455,6 +455,9 @@ DEFINE_uint64(snapshot_hold_ops, 0,
               "If non-zero, then releases snapshots N operations after they're "
               "acquired.");
 
+DEFINE_bool(use_multiget, false,
+            "If set, use the batched MultiGet API for reads");
+
 static bool ValidateInt32Percent(const char* flagname, int32_t value) {
   if (value < 0 || value>100) {
     fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
@@ -1725,6 +1728,27 @@ class StressTest {
     return base_key + thread->rand.Next() % FLAGS_active_width;
   }
 
+  static std::vector<int64_t> GenerateNKeys(
+      ThreadState* thread,
+      int num_keys,
+      uint64_t iteration) {
+    const double completed_ratio =
+        static_cast<double>(iteration) / FLAGS_ops_per_thread;
+    const int64_t base_key = static_cast<int64_t>(
+        completed_ratio * (FLAGS_max_key - FLAGS_active_width));
+    std::vector<int64_t> keys;
+    keys.reserve(num_keys);
+    int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
+    keys.push_back(next_key);
+    for (int i = 1; i < num_keys; ++i) {
+      // This may result in some duplicate keys
+      next_key = next_key + thread->rand.Next() %
+        (FLAGS_active_width - (next_key - base_key));
+      keys.push_back(next_key);
+    }
+    return keys;
+  }
+
   static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
     size_t value_sz =
         ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
@@ -2162,7 +2186,14 @@ class StressTest {
       int prob_op = thread->rand.Uniform(100);
       if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
         // OPERATION read
-        TestGet(thread, read_opts, rand_column_families, rand_keys);
+        if (FLAGS_use_multiget) {
+          int num_keys = thread->rand.Uniform(64);
+          rand_keys = GenerateNKeys(thread, num_keys, i);
+          TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
+          i += num_keys - 1;
+        } else {
+          TestGet(thread, read_opts, rand_column_families, rand_keys);
+        }
       } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
         // OPERATION prefix scan
         // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
@@ -2211,6 +2242,11 @@ class StressTest {
       const std::vector<int>& rand_column_families,
       const std::vector<int64_t>& rand_keys) = 0;
 
+  virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+      const ReadOptions& read_opts,
+      const std::vector<int>& rand_column_families,
+      const std::vector<int64_t>& rand_keys) = 0;
+
   virtual Status TestPrefixScan(ThreadState* thread,
       const ReadOptions& read_opts,
       const std::vector<int>& rand_column_families,
@@ -2546,6 +2582,8 @@ class StressTest {
     fprintf(stdout, "Checksum type             : %s\n", checksum.c_str());
     fprintf(stdout, "Max subcompactions        : %" PRIu64 "\n",
             FLAGS_subcompactions);
+    fprintf(stdout, "Use MultiGet              : %s\n",
+            FLAGS_use_multiget ? "true" : "false");
 
     const char* memtablerep = "";
     switch (FLAGS_rep_factory) {
@@ -3012,6 +3050,38 @@ class NonBatchedOpsStressTest : public StressTest {
     return s;
   }
 
+  virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+      const ReadOptions& read_opts,
+      const std::vector<int>& rand_column_families,
+      const std::vector<int64_t>& rand_keys) {
+    size_t num_keys = rand_keys.size();
+    std::vector<std::string> key_str;
+    std::vector<Slice> keys;
+    std::vector<PinnableSlice> values(num_keys);
+    std::vector<Status> statuses(num_keys);
+    ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+    for (size_t i = 0; i < num_keys; ++i) {
+      key_str.emplace_back(Key(rand_keys[i]));
+      keys.emplace_back(key_str.back());
+    }
+    db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
+                  statuses.data());
+    for (const auto& s : statuses) {
+      if (s.ok()) {
+        // found case
+        thread->stats.AddGets(1, 1);
+      } else if (s.IsNotFound()) {
+        // not found case
+        thread->stats.AddGets(1, 0);
+      } else {
+        // errors case
+        thread->stats.AddErrors(1);
+      }
+    }
+    return statuses;
+  }
+
   virtual Status TestPrefixScan(ThreadState* thread,
       const ReadOptions& read_opts,
       const std::vector<int>& rand_column_families,
@@ -3532,6 +3602,70 @@ class BatchedOpsStressTest : public StressTest {
     return s;
   }
 
+  virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+      const ReadOptions& readoptions,
+      const std::vector<int>& rand_column_families,
+      const std::vector<int64_t>& rand_keys) {
+    int num_keys = rand_keys.size();
+    std::vector<Status> statuses(num_keys);
+    std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+    for (int key = 0; key < 10; ++key) {
+      std::vector<Slice> key_slices;
+      std::vector<PinnableSlice> values(num_keys);
+      ReadOptions readoptionscopy = readoptions;
+      readoptionscopy.snapshot = db_->GetSnapshot();
+      std::vector<std::string> key_str;
+      std::string from_db;
+      ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+      for (int rand_key = 0; rand_key < num_keys; ++rand_key) {
+        key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
+        key_slices.emplace_back(key_str.back());
+      }
+      db_->MultiGet(readoptionscopy, cfh, num_keys, key_slices.data(),
+          values.data(), statuses.data());
+      for (int i = 0; i < num_keys; i++) {
+        Status s = statuses[i];
+        if (!s.ok() && !s.IsNotFound()) {
+          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
+          thread->stats.AddErrors(1);
+          // we continue after error rather than exiting so that we can
+          // find more errors if any
+        } else if (s.IsNotFound()) {
+          thread->stats.AddGets(1, 0);
+        } else {
+          char expected_prefix = (keys[key])[0];
+          char actual_prefix = (values[i])[0];
+          if (actual_prefix != expected_prefix) {
+            fprintf(stderr, "error expected prefix = %c actual = %c\n",
+                    expected_prefix, actual_prefix);
+          }
+          std::string str;
+          str.assign(values[i].data(), values[i].size());
+          values[i].Reset();
+          str[0] = ' '; // blank out the differing character
+          values[i].PinSelf(str);
+          thread->stats.AddGets(1, 1);
+        }
+      }
+      db_->ReleaseSnapshot(readoptionscopy.snapshot);
+
+      // Now that we retrieved all values, check that they all match
+      for (int i = 1; i < num_keys; i++) {
+        if (values[i] != values[0]) {
+          fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
+                  key_str[i].c_str(),
+                  StringToHex(values[0].ToString()).c_str(),
+                  StringToHex(values[i].ToString()).c_str());
+        // we continue after error rather than exiting so that we can
+        // find more errors if any
+        }
+      }
+    }
+
+    return statuses;
+  }
+
   // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
   // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
   // of the key. Each of these 10 scans returns a series of values;
@@ -3747,6 +3881,37 @@ class AtomicFlushStressTest : public StressTest {
     return s;
   }
 
+  virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+      const ReadOptions& read_opts,
+      const std::vector<int>& rand_column_families,
+      const std::vector<int64_t>& rand_keys) {
+    int num_keys = rand_keys.size();
+    std::vector<std::string> key_str;
+    std::vector<Slice> keys;
+    std::vector<PinnableSlice> values(num_keys);
+    std::vector<Status> statuses(num_keys);
+    ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+    for (int i = 0; i < num_keys; ++i) {
+      key_str.emplace_back(Key(rand_keys[i]));
+      keys.emplace_back(key_str.back());
+    }
+    db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
+    for (auto s : statuses) {
+      if (s.ok()) {
+        // found case
+        thread->stats.AddGets(1, 1);
+      } else if (s.IsNotFound()) {
+        // not found case
+        thread->stats.AddGets(1, 0);
+      } else {
+        // errors case
+        thread->stats.AddErrors(1);
+      }
+    }
+    return statuses;
+  }
+
   virtual Status TestPrefixScan(ThreadState* thread,
                                 const ReadOptions& readoptions,
                                 const std::vector<int>& rand_column_families,