"If non-zero, then releases snapshots N operations after they're "
"acquired.");
+DEFINE_bool(use_multiget, false,
+ "If set, use the batched MultiGet API for reads");
+
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
if (value < 0 || value>100) {
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
return base_key + thread->rand.Next() % FLAGS_active_width;
}
+ static std::vector<int64_t> GenerateNKeys(
+ ThreadState* thread,
+ int num_keys,
+ uint64_t iteration) {
+ const double completed_ratio =
+ static_cast<double>(iteration) / FLAGS_ops_per_thread;
+ const int64_t base_key = static_cast<int64_t>(
+ completed_ratio * (FLAGS_max_key - FLAGS_active_width));
+ std::vector<int64_t> keys;
+ keys.reserve(num_keys);
+ int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
+ keys.push_back(next_key);
+ for (int i = 1; i < num_keys; ++i) {
+ // This may result in some duplicate keys
+ next_key = next_key + thread->rand.Next() %
+ (FLAGS_active_width - (next_key - base_key));
+ keys.push_back(next_key);
+ }
+ return keys;
+ }
+
static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
size_t value_sz =
((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
int prob_op = thread->rand.Uniform(100);
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
- TestGet(thread, read_opts, rand_column_families, rand_keys);
+ if (FLAGS_use_multiget) {
+ int num_keys = thread->rand.Uniform(64);
+ rand_keys = GenerateNKeys(thread, num_keys, i);
+ TestMultiGet(thread, read_opts, rand_column_families, rand_keys);
+ i += num_keys - 1;
+ } else {
+ TestGet(thread, read_opts, rand_column_families, rand_keys);
+ }
} else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
// OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
+ virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+ const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) = 0;
+
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
fprintf(stdout, "Max subcompactions : %" PRIu64 "\n",
FLAGS_subcompactions);
+ fprintf(stdout, "Use MultiGet : %s\n",
+ FLAGS_use_multiget ? "true" : "false");
const char* memtablerep = "";
switch (FLAGS_rep_factory) {
return s;
}
+ virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+ const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ size_t num_keys = rand_keys.size();
+ std::vector<std::string> key_str;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values(num_keys);
+ std::vector<Status> statuses(num_keys);
+ ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+ for (size_t i = 0; i < num_keys; ++i) {
+ key_str.emplace_back(Key(rand_keys[i]));
+ keys.emplace_back(key_str.back());
+ }
+ db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
+ statuses.data());
+ for (const auto& s : statuses) {
+ if (s.ok()) {
+ // found case
+ thread->stats.AddGets(1, 1);
+ } else if (s.IsNotFound()) {
+ // not found case
+ thread->stats.AddGets(1, 0);
+ } else {
+ // errors case
+ thread->stats.AddErrors(1);
+ }
+ }
+ return statuses;
+ }
+
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
return s;
}
+ virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+ const ReadOptions& readoptions,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ int num_keys = rand_keys.size();
+ std::vector<Status> statuses(num_keys);
+ std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+ for (int key = 0; key < 10; ++key) {
+ std::vector<Slice> key_slices;
+ std::vector<PinnableSlice> values(num_keys);
+ ReadOptions readoptionscopy = readoptions;
+ readoptionscopy.snapshot = db_->GetSnapshot();
+ std::vector<std::string> key_str;
+ std::string from_db;
+ ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+ for (int rand_key = 0; rand_key < num_keys; ++rand_key) {
+ key_str.emplace_back(keys[key] + Key(rand_keys[rand_key]));
+ key_slices.emplace_back(key_str.back());
+ }
+ db_->MultiGet(readoptionscopy, cfh, num_keys, key_slices.data(),
+ values.data(), statuses.data());
+ for (int i = 0; i < num_keys; i++) {
+ Status s = statuses[i];
+ if (!s.ok() && !s.IsNotFound()) {
+ fprintf(stderr, "get error: %s\n", s.ToString().c_str());
+ thread->stats.AddErrors(1);
+ // we continue after error rather than exiting so that we can
+ // find more errors if any
+ } else if (s.IsNotFound()) {
+ thread->stats.AddGets(1, 0);
+ } else {
+ char expected_prefix = (keys[key])[0];
+ char actual_prefix = (values[i])[0];
+ if (actual_prefix != expected_prefix) {
+ fprintf(stderr, "error expected prefix = %c actual = %c\n",
+ expected_prefix, actual_prefix);
+ }
+ std::string str;
+ str.assign(values[i].data(), values[i].size());
+ values[i].Reset();
+ str[0] = ' '; // blank out the differing character
+ values[i].PinSelf(str);
+ thread->stats.AddGets(1, 1);
+ }
+ }
+ db_->ReleaseSnapshot(readoptionscopy.snapshot);
+
+ // Now that we retrieved all values, check that they all match
+ for (int i = 1; i < num_keys; i++) {
+ if (values[i] != values[0]) {
+ fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
+ key_str[i].c_str(),
+ StringToHex(values[0].ToString()).c_str(),
+ StringToHex(values[i].ToString()).c_str());
+ // we continue after error rather than exiting so that we can
+ // find more errors if any
+ }
+ }
+ }
+
+ return statuses;
+ }
+
// Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
return s;
}
+ virtual std::vector<Status> TestMultiGet(ThreadState* thread,
+ const ReadOptions& read_opts,
+ const std::vector<int>& rand_column_families,
+ const std::vector<int64_t>& rand_keys) {
+ int num_keys = rand_keys.size();
+ std::vector<std::string> key_str;
+ std::vector<Slice> keys;
+ std::vector<PinnableSlice> values(num_keys);
+ std::vector<Status> statuses(num_keys);
+ ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+
+ for (int i = 0; i < num_keys; ++i) {
+ key_str.emplace_back(Key(rand_keys[i]));
+ keys.emplace_back(key_str.back());
+ }
+ db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), statuses.data());
+ for (auto s : statuses) {
+ if (s.ok()) {
+ // found case
+ thread->stats.AddGets(1, 1);
+ } else if (s.IsNotFound()) {
+ // not found case
+ thread->stats.AddGets(1, 0);
+ } else {
+ // errors case
+ thread->stats.AddErrors(1);
+ }
+ }
+ return statuses;
+ }
+
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,