#include <cstddef>
#include <memory>
#include <mutex>
+#include <queue>
#include <thread>
#include <unordered_map>
"Warning: large values can affect throughput. "
"Valid overwrite_window_size values: [1, kMaxUint32].");
+DEFINE_uint64(
+ disposable_entries_delete_delay, 0,
+ "Minimum delay in microseconds for the series of Deletes "
+ "to be issued. When 0 the insertion of the last disposable entry is "
+ "immediately followed by the issuance of the Deletes. "
+ "(only compatible with fillanddeleteuniquerandom benchmark).");
+
+DEFINE_uint64(disposable_entries_batch_size, 0,
+ "Number of consecutively inserted disposable KV entries "
+ "that will be deleted after 'delete_delay' microseconds. "
+ "A series of Deletes is always issued once all the "
+ "disposable KV entries it targets have been inserted "
+ "into the DB. When 0 no deletes are issued and a "
+ "regular 'filluniquerandom' benchmark occurs. "
+ "(only compatible with fillanddeleteuniquerandom benchmark)");
+
+DEFINE_int32(disposable_entries_value_size, 64,
+ "Size of the values (in bytes) of the entries targeted by "
+ "selective deletes. "
+ "(only compatible with fillanddeleteuniquerandom benchmark)");
+
+DEFINE_uint64(
+ persistent_entries_batch_size, 0,
+ "Number of KV entries being inserted right before the deletes "
+ "targeting the disposable KV entries are issued. These "
+ "persistent keys are not targeted by the deletes, and will always "
+ "remain valid in the DB. (only compatible with "
+ "--benchmarks='fillanddeleteuniquerandom' "
+ "and used when--disposable_entries_batch_size is > 0).");
+
+DEFINE_int32(persistent_entries_value_size, 64,
+ "Size of the values (in bytes) of the entries not targeted by "
+ "deletes. (only compatible with "
+ "--benchmarks='fillanddeleteuniquerandom' "
+ "and used when--disposable_entries_batch_size is > 0).");
+
DEFINE_double(read_random_exp_range, 0.0,
"Read random's key will be generated using distribution of "
"num * exp(-r) where r is uniform number from 0 to this value. "
} else if (name == "fillrandom") {
fresh_db = true;
method = &Benchmark::WriteRandom;
- } else if (name == "filluniquerandom") {
+ } else if (name == "filluniquerandom" ||
+ name == "fillanddeleteuniquerandom") {
fresh_db = true;
if (num_threads > 1) {
fprintf(stderr,
- "filluniquerandom multithreaded not supported"
- ", use 1 thread");
+ "filluniquerandom and fillanddeleteuniquerandom "
+ "multithreaded not supported, use 1 thread");
num_threads = 1;
}
method = &Benchmark::WriteUniqueRandom;
return std::numeric_limits<uint64_t>::max();
}
+ // Only available for UNIQUE_RANDOM mode.
+ uint64_t Fetch(uint64_t index) {
+ assert(mode_ == UNIQUE_RANDOM);
+ assert(index < values_.size());
+ return values_[index];
+ }
+
private:
Random64* rand_;
WriteMode mode_;
std::unique_ptr<const char[]> end_key_guard;
Slice end_key = AllocateKey(&end_key_guard);
double p = 0.0;
- uint64_t num_overwrites = 0, num_unique_keys = 0;
+ uint64_t num_overwrites = 0, num_unique_keys = 0, num_selective_deletes = 0;
// If user set overwrite_probability flag,
// check if value is in [0.0,1.0].
if (FLAGS_overwrite_probability > 0.0) {
std::deque<int64_t> inserted_key_window;
Random64 reservoir_id_gen(FLAGS_seed);
+ // --- Variables used in disposable/persistent keys simulation:
+ // The following variables are used when
+ // disposable_entries_batch_size is >0. We simualte a workload
+ // where the following sequence is repeated multiple times:
+ // "A set of keys S1 is inserted ('disposable entries'), then after
+ // some delay another set of keys S2 is inserted ('persistent entries')
+ // and the first set of keys S1 is deleted. S2 artificially represents
+ // the insertion of hypothetical results from some undefined computation
+ // done on the first set of keys S1. The next sequence can start as soon
+ // as the last disposable entry in the set S1 of this sequence is
+ // inserted, if the delay is non negligible"
+ bool skip_for_loop = false, is_disposable_entry = true;
+ std::vector<uint64_t> disposable_entries_index(num_key_gens, 0);
+ std::vector<uint64_t> persistent_ent_and_del_index(num_key_gens, 0);
+ const uint64_t kNumDispAndPersEntries =
+ FLAGS_disposable_entries_batch_size +
+ FLAGS_persistent_entries_batch_size;
+ if (kNumDispAndPersEntries > 0) {
+ if ((write_mode != UNIQUE_RANDOM) || (writes_per_range_tombstone_ > 0) ||
+ (p > 0.0)) {
+ fprintf(
+ stderr,
+ "Disposable/persistent deletes are not compatible with overwrites "
+ "and DeleteRanges; and are only supported in filluniquerandom.\n");
+ ErrorExit();
+ }
+ if (FLAGS_disposable_entries_value_size < 0 ||
+ FLAGS_persistent_entries_value_size < 0) {
+ fprintf(
+ stderr,
+ "disposable_entries_value_size and persistent_entries_value_size"
+ "have to be positive.\n");
+ ErrorExit();
+ }
+ }
+ Random rnd_disposable_entry(static_cast<uint32_t>(FLAGS_seed));
+ std::string random_value;
+ // Queue that stores scheduled timestamp of disposable entries deletes,
+ // along with starting index of disposable entry keys to delete.
+ std::vector<std::queue<std::pair<uint64_t, uint64_t>>> disposable_entries_q(
+ num_key_gens);
+ // --- End of variables used in disposable/persistent keys simulation.
+
std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
std::vector<Slice> expanded_keys;
if (FLAGS_expand_range_tombstones) {
inserted_key_window.push_back(rand_num);
}
}
+ } else if (kNumDispAndPersEntries > 0) {
+ // Check if queue is non-empty and if we need to insert
+ // 'persistent' KV entries (KV entries that are never deleted)
+ // and delete disposable entries previously inserted.
+ if (!disposable_entries_q[id].empty() &&
+ (disposable_entries_q[id].front().first <
+ FLAGS_env->NowMicros())) {
+ // If we need to perform a "merge op" pattern,
+ // we first write all the persistent KV entries not targeted
+ // by deletes, and then we write the disposable entries deletes.
+ if (persistent_ent_and_del_index[id] <
+ FLAGS_persistent_entries_batch_size) {
+ // Generate key to insert.
+ rand_num =
+ key_gens[id]->Fetch(disposable_entries_q[id].front().second +
+ FLAGS_disposable_entries_batch_size +
+ persistent_ent_and_del_index[id]);
+ persistent_ent_and_del_index[id]++;
+ is_disposable_entry = false;
+ skip_for_loop = false;
+ } else if (persistent_ent_and_del_index[id] <
+ kNumDispAndPersEntries) {
+ // Find key of the entry to delete.
+ rand_num =
+ key_gens[id]->Fetch(disposable_entries_q[id].front().second +
+ (persistent_ent_and_del_index[id] -
+ FLAGS_persistent_entries_batch_size));
+ persistent_ent_and_del_index[id]++;
+ GenerateKeyFromInt(rand_num, FLAGS_num, &key);
+ // For the delete operation, everything happens here and we
+ // skip the rest of the for-loop, which is designed for
+ // inserts.
+ if (FLAGS_num_column_families <= 1) {
+ batch.Delete(key);
+ } else {
+ // We use same rand_num as seed for key and column family so
+ // that we can deterministically find the cfh corresponding to a
+ // particular key while reading the key.
+ batch.Delete(db_with_cfh->GetCfh(rand_num), key);
+ }
+ // A delete only includes Key+Timestamp (no value).
+ batch_bytes += key_size_ + user_timestamp_size_;
+ bytes += key_size_ + user_timestamp_size_;
+ num_selective_deletes++;
+ // Skip rest of the for-loop (j=0, j<entries_per_batch_,j++).
+ skip_for_loop = true;
+ } else {
+ assert(false); // should never reach this point.
+ }
+ // If disposable_entries_q needs to be updated (ie: when a selective
+ // insert+delete was successfully completed, pop the job out of the
+ // queue).
+ if (!disposable_entries_q[id].empty() &&
+ (disposable_entries_q[id].front().first <
+ FLAGS_env->NowMicros()) &&
+ persistent_ent_and_del_index[id] == kNumDispAndPersEntries) {
+ disposable_entries_q[id].pop();
+ persistent_ent_and_del_index[id] = 0;
+ }
+
+ // If we are deleting disposable entries, skip the rest of the
+ // for-loop since there is no key-value inserts at this moment in
+ // time.
+ if (skip_for_loop) {
+ continue;
+ }
+
+ }
+ // If no job is in the queue, then we keep inserting disposable KV
+ // entries that will be deleted later by a series of deletes.
+ else {
+ rand_num = key_gens[id]->Fetch(disposable_entries_index[id]);
+ disposable_entries_index[id]++;
+ is_disposable_entry = true;
+ if ((disposable_entries_index[id] %
+ FLAGS_disposable_entries_batch_size) == 0) {
+ // Skip the persistent KV entries inserts for now
+ disposable_entries_index[id] +=
+ FLAGS_persistent_entries_batch_size;
+ }
+ }
} else {
rand_num = key_gens[id]->Next();
}
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
- Slice val = gen.Generate();
+ Slice val;
+ if (kNumDispAndPersEntries > 0) {
+ random_value = rnd_disposable_entry.RandomString(
+ is_disposable_entry ? FLAGS_disposable_entries_value_size
+ : FLAGS_persistent_entries_value_size);
+ val = Slice(random_value);
+ num_unique_keys++;
+ } else {
+ val = gen.Generate();
+ }
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
// Stacked BlobDB
batch_bytes += val.size() + key_size_ + user_timestamp_size_;
bytes += val.size() + key_size_ + user_timestamp_size_;
++num_written;
+
+ // If all disposable entries have been inserted, then we need to
+ // add in the job queue a call for 'persistent entry insertions +
+ // disposable entry deletions'.
+ if (kNumDispAndPersEntries > 0 && is_disposable_entry &&
+ ((disposable_entries_index[id] % kNumDispAndPersEntries) == 0)) {
+ // Queue contains [timestamp, starting_idx],
+ // timestamp = current_time + delay (minimum aboslute time when to
+ // start inserting the selective deletes) starting_idx = index in the
+ // keygen of the rand_num to generate the key of the first KV entry to
+ // delete (= key of the first selective delete).
+ disposable_entries_q[id].push(std::make_pair(
+ FLAGS_env->NowMicros() +
+ FLAGS_disposable_entries_delete_delay /* timestamp */,
+ disposable_entries_index[id] - kNumDispAndPersEntries
+ /*starting idx*/));
+ }
if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ &&
(num_written - writes_before_delete_range_) /
}
if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) {
fprintf(stdout,
- "Number of unique keys inerted: %" PRIu64
+ "Number of unique keys inserted: %" PRIu64
".\nNumber of overwrites: %" PRIu64 "\n",
num_unique_keys, num_overwrites);
+ } else if (kNumDispAndPersEntries > 0) {
+ fprintf(stdout,
+ "Number of unique keys inserted (disposable+persistent): %" PRIu64
+ ".\nNumber of 'disposable entry delete': %" PRIu64 "\n",
+ num_written, num_selective_deletes);
}
thread->stats.AddBytes(bytes);
}