WARNING_FLAGS = -Wall -Werror
CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
-CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x
+CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x -Woverloaded-virtual
LDFLAGS += $(PLATFORM_LDFLAGS)
SaveKey(ikey.user_key, &saved_key_);
current_entry_is_merged_ = true;
valid_ = true;
- // Go to a different state machine
- MergeValuesNewToOld();
- // TODO: what if !iter_->Valid()
+ MergeValuesNewToOld(); // Go to a different state machine
return;
- break;
case kTypeLogData:
assert(false);
break;
// POST: saved_value_ has the merged value for the user key
// iter_ points to the next entry (or invalid)
void DBIter::MergeValuesNewToOld() {
- // TODO: Is there a way to unite with MergeHelper or other similar code?
+ if (!user_merge_operator_) {
+ Log(logger_, "Options::merge_operator is null.");
+ throw std::logic_error("DBIter::MergeValuesNewToOld() with"
+ " Options::merge_operator null");
+ }
// Start the merge process by pushing the first operand
std::deque<std::string> operands;
// final result in saved_value_. We are done!
// ignore corruption if there is any.
const Slice value = iter_->value();
- user_merge_operator_->Merge(ikey.user_key, &value, operands,
- &saved_value_, logger_.get());
+ user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
+ &saved_value_, logger_.get());
// iter_ is positioned after put
iter_->Next();
return;
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
- user_merge_operator_->Merge(ikey.user_key, nullptr, operands,
- &saved_value_, logger_.get());
+ user_merge_operator_->FullMerge(saved_key_, nullptr, operands,
+ &saved_value_, logger_.get());
}
void DBIter::Prev() {
*s = Status::OK();
if (merge_in_progress) {
assert(merge_operator);
- if (!merge_operator->Merge(key.user_key(), &v, *operands,
- value, logger.get())) {
+ if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
+ value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
if (merge_in_progress) {
assert(merge_operator);
*s = Status::OK();
- if (!merge_operator->Merge(key.user_key(), nullptr, *operands,
- value, logger.get())) {
+ if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
+ value, logger.get())) {
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
// => store result in operands_.back() (and update keys_.back())
// => change the entry type to kTypeValue for keys_.back()
// We are done! Return a success if the merge passes.
- success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
- operands_, &merge_result,
- logger_);
+ success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
+ operands_, &merge_result,
+ logger_);
// We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk)
// => change the entry type to kTypeValue for keys_.back()
// We are done! Success!
const Slice value = iter->value();
- success_ = user_merge_operator_->Merge(ikey.user_key, &value,
- operands_, &merge_result,
- logger_);
+ success_ = user_merge_operator_->FullMerge(ikey.user_key, &value,
+ operands_, &merge_result,
+ logger_);
// We store the result in keys_.back() and operands_.back()
// if nothing went wrong (i.e.: no operand corruption on disk)
assert(kTypeMerge == orig_ikey.type);
assert(operands_.size() >= 1);
assert(operands_.size() == keys_.size());
- success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
- operands_, &merge_result,
- logger_);
+ success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
+ operands_, &merge_result,
+ logger_);
if (success_) {
std::string& key = keys_.back(); // The original key encountered
// Given a "real" merge from the library, call the user's
// associative merge function one-by-one on each of the operands.
// NOTE: It is assumed that the client's merge-operator will handle any errors.
-bool AssociativeMergeOperator::Merge(
+bool AssociativeMergeOperator::FullMerge(
const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
} else if (kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = kFound;
- if (!s->merge_operator->Merge(s->user_key, &v, *ops,
- s->value, s->logger)) {
+ if (!s->merge_operator->FullMerge(s->user_key, &v, *ops,
+ s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
s->state = kDeleted;
} else if (kMerge == s->state) {
s->state = kFound;
- if (!s->merge_operator->Merge(s->user_key, nullptr, *ops,
- s->value, s->logger)) {
+ if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops,
+ s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
if (kMerge == saver.state) {
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
- if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands,
- value, logger.get())) {
+ if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands,
+ value, logger.get())) {
*status = Status::OK();
} else {
RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES);
// into rocksdb); numeric addition and string concatenation are examples;
//
// b) MergeOperator - the generic class for all the more abstract / complex
-// operations; one method to merge a Put/Delete value with a merge operand;
-// and another method (PartialMerge) that merges two operands together.
-// this is especially useful if your key values have a complex structure,
-// but you would still like to support client-specific incremental updates.
+// operations; one method (FullMerge) to merge a Put/Delete value with a
+// merge operand; and another method (PartialMerge) that merges two
+// operands together. this is especially useful if your key values have a
+// complex structure but you would still like to support client-specific
+// incremental updates.
//
// AssociativeMergeOperator is simpler to implement. MergeOperator is simply
// more powerful.
// internal corruption. This will be treated as an error by the library.
//
// Also make use of the *logger for error messages.
- virtual bool Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operand_list,
- std::string* new_value,
- Logger* logger) const = 0;
+ virtual bool FullMerge(const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operand_list,
+ std::string* new_value,
+ Logger* logger) const = 0;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types
// TODO: Presently there is no way to differentiate between error/corruption
// and simply "return false". For now, the client should simply return
// false in any case it cannot perform partial-merge, regardless of reason.
- // If there is corruption in the data, handle it in the above Merge() function,
+ // If there is corruption in the data, handle it in the FullMerge() function,
// and return false there.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
private:
// Default implementations of the MergeOperator functions
- virtual bool Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operand_list,
- std::string* new_value,
- Logger* logger) const override;
+ virtual bool FullMerge(const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operand_list,
+ std::string* new_value,
+ Logger* logger) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& key,
std::string* value,
bool* value_found = nullptr) override {
- return KeyMayExist(options, key, value, value_found);
+ return sdb_->KeyMayExist(options, key, value, value_found);
}
virtual Status Delete(const WriteOptions& wopts, const Slice& key) override {
// From the client-perspective, semantics are the same.
class PutOperator : public MergeOperator {
public:
- virtual bool Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operand_sequence,
- std::string* new_value,
- Logger* logger) const override {
+ virtual bool FullMerge(const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operand_sequence,
+ std::string* new_value,
+ Logger* logger) const override {
// Put basically only looks at the current/latest value
assert(!operand_sequence.empty());
assert(new_value != nullptr);
class StringAppendOperator : public AssociativeMergeOperator {
public:
-
StringAppendOperator(char delim_char); /// Constructor: specify delimiter
virtual bool Merge(const Slice& key,
}
// Implementation for the merge operation (concatenates two strings)
-bool StringAppendTESTOperator::Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operands,
- std::string* new_value,
- Logger* logger) const {
+bool StringAppendTESTOperator::FullMerge(
+ const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operands,
+ std::string* new_value,
+ Logger* logger) const {
// Clear the *new_value for writing.
assert(new_value);
StringAppendTESTOperator(char delim_char); /// Constructor with delimiter
- virtual bool Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operand_sequence,
- std::string* new_value,
- Logger* logger) const override;
+ virtual bool FullMerge(const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operand_sequence,
+ std::string* new_value,
+ Logger* logger) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
// THE TEST CASES BEGIN HERE
class StringAppendOperatorTest { };
+TEST(StringAppendOperatorTest, IteratorTest) {
+ DestroyDB(kDbName, Options()); // Start this test with a fresh DB
+
+ StringAppendOperator append_op(',');
+ auto db_ = OpenDb(&append_op);
+ StringLists slists(db_);
+
+ slists.Append("k1","v1");
+ slists.Append("k1","v2");
+ slists.Append("k1","v3");
+
+ slists.Append("k2","a1");
+ slists.Append("k2","a2");
+ slists.Append("k2","a3");
+
+ std::string res;
+ std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(ReadOptions()));
+ std::string k1("k1");
+ std::string k2("k2");
+ bool first = true;
+ for (it->Seek(k1); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ ASSERT_EQ(res, "v1,v2,v3");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "a1,a2,a3");
+ }
+ }
+ slists.Append("k2", "a4");
+ slists.Append("k1", "v4");
+
+ // Snapshot should still be the same. Should ignore a4 and v4.
+ first = true;
+ for (it->Seek(k1); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ ASSERT_EQ(res, "v1,v2,v3");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "a1,a2,a3");
+ }
+ }
+
+
+ // Should release the snapshot and be aware of the new stuff now
+ it.reset(db_->NewIterator(ReadOptions()));
+ first = true;
+ for (it->Seek(k1); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ ASSERT_EQ(res, "v1,v2,v3,v4");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "a1,a2,a3,a4");
+ }
+ }
+
+ // start from k2 this time.
+ for (it->Seek(k2); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ ASSERT_EQ(res, "v1,v2,v3,v4");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "a1,a2,a3,a4");
+ }
+ }
+
+ slists.Append("k3","g1");
+
+ it.reset(db_->NewIterator(ReadOptions()));
+ first = true;
+ std::string k3("k3");
+ for(it->Seek(k2); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ ASSERT_EQ(res, "a1,a2,a3,a4");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "g1");
+ }
+ }
+ for(it->Seek(k3); it->Valid(); it->Next()) {
+ res = it->value().ToString();
+ if (first) {
+ // should not be hit
+ ASSERT_EQ(res, "a1,a2,a3,a4");
+ first = false;
+ } else {
+ ASSERT_EQ(res, "g1");
+ }
+ }
+
+}
TEST(StringAppendOperatorTest,SimpleTest) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB
supported with TTL"));
}
-bool DBWithTTL::KeyMayExist(ReadOptions& options,
+bool DBWithTTL::KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found) {
const std::vector<Slice>& keys,
std::vector<std::string>* values);
- virtual bool KeyMayExist(ReadOptions& options,
+ virtual bool KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
- bool* value_found = nullptr);
+ bool* value_found = nullptr) override;
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
assert(merge_op);
}
- virtual bool Merge(const Slice& key,
- const Slice* existing_value,
- const std::deque<std::string>& operands,
- std::string* new_value,
- Logger* logger) const override {
+ virtual bool FullMerge(const Slice& key,
+ const Slice* existing_value,
+ const std::deque<std::string>& operands,
+ std::string* new_value,
+ Logger* logger) const override {
const uint32_t ts_len = DBWithTTL::kTSLength;
if (existing_value && existing_value->size() < ts_len) {
Log(logger, "Error: Could not remove timestamp from existing value.");
}
// Apply the user merge operator (store result in *new_value)
+ bool good = true;
if (existing_value) {
Slice existing_value_without_ts(existing_value->data(),
existing_value->size() - ts_len);
- user_merge_op_->Merge(key, &existing_value_without_ts,
- operands_without_ts, new_value, logger);
+ good = user_merge_op_->FullMerge(key, &existing_value_without_ts,
+ operands_without_ts, new_value, logger);
} else {
- user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value,
- logger);
+ good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts,
+ new_value, logger);
+ }
+
+ // Return false if the user merge operator returned false
+ if (!good) {
+ return false;
}
// Augment the *new_value with the ttl time-stamp
assert(new_value);
Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len);
Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len);
- user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
- new_value, logger);
+ if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
+ new_value, logger)) {
+ return false;
+ }
// Augment the *new_value with the ttl time-stamp
int32_t curtime;