std::string* timestamp) {
assert(user_comparator_);
if (options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ DefaultColumnFamily(), *(options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
size_t num_keys = keys.size();
if (options.timestamp) {
- Status s =
- FailIfTsSizesMismatch(DefaultColumnFamily(), *(options.timestamp));
+ Status s = FailIfTsMismatchCf(DefaultColumnFamily(), *(options.timestamp),
+ /*ts_for_read=*/true);
if (!s.ok()) {
return std::vector<Status>(num_keys, s);
}
assert(get_impl_options.column_family);
if (read_options.timestamp) {
- const Status s = FailIfTsSizesMismatch(get_impl_options.column_family,
- *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(get_impl_options.column_family,
+ *(read_options.timestamp),
+ /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
for (size_t i = 0; i < num_keys; ++i) {
assert(column_family[i]);
if (read_options.timestamp) {
- stat_list[i] =
- FailIfTsSizesMismatch(column_family[i], *(read_options.timestamp));
+ stat_list[i] = FailIfTsMismatchCf(
+ column_family[i], *(read_options.timestamp), /*ts_for_read=*/true);
if (!stat_list[i].ok()) {
should_fail = true;
}
ColumnFamilyHandle* cfh = column_families[i];
assert(cfh);
if (read_options.timestamp) {
- statuses[i] = FailIfTsSizesMismatch(cfh, *(read_options.timestamp));
+ statuses[i] = FailIfTsMismatchCf(cfh, *(read_options.timestamp),
+ /*ts_for_read=*/true);
if (!statuses[i].ok()) {
should_fail = true;
}
assert(column_family);
if (read_options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
- const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
+ /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
void SetDbSessionId();
Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const;
- Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
- const Slice& ts) const;
+ Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family, const Slice& ts,
+ bool ts_for_read) const;
// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
return Status::OK();
}
-inline Status DBImpl::FailIfTsSizesMismatch(
- const ColumnFamilyHandle* column_family, const Slice& ts) const {
+inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
+ const Slice& ts,
+ bool ts_for_read) const {
if (!column_family) {
return Status::InvalidArgument("column family handle cannot be null");
}
<< ts_sz << " given";
return Status::InvalidArgument(oss.str());
}
+ if (ts_for_read) {
+ auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
+ auto cfd = cfh->cfd();
+ std::string current_ts_low = cfd->GetFullHistoryTsLow();
+ if (!current_ts_low.empty() &&
+ ucmp->CompareTimestamp(ts, current_ts_low) < 0) {
+ std::stringstream oss;
+ oss << "Read timestamp: " << ts.ToString(true)
+ << " is smaller than full_history_ts_low: "
+ << Slice(current_ts_low).ToString(true) << std::endl;
+ return Status::InvalidArgument(oss.str());
+ }
+ }
return Status::OK();
}
assert(column_family);
if (read_options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
ColumnFamilyHandle* column_family) {
assert(column_family);
if (read_options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
- const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
+ /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
assert(column_family);
if (read_options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
assert(column_family);
if (read_options.timestamp) {
- const Status s =
- FailIfTsSizesMismatch(column_family, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return NewErrorIterator(s);
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
- const Status s = FailIfTsSizesMismatch(cf, *(read_options.timestamp));
+ const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp),
+ /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& val) {
- const Status s = FailIfTsSizesMismatch(column_family, ts);
+ const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
if (!s.ok()) {
return s;
}
Status DBImpl::Delete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
- const Status s = FailIfTsSizesMismatch(column_family, ts);
+ const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
if (!s.ok()) {
return s;
}
Status DBImpl::SingleDelete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts) {
- const Status s = FailIfTsSizesMismatch(column_family, ts);
+ const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false);
if (!s.ok()) {
return s;
}
}
ASSERT_OK(Flush());
- // TODO return a non-ok for read ts < current_ts_low and test it.
for (int i = 0; i < 10; i++) {
ReadOptions read_opts;
std::string ts_str = Timestamp(i, 0);
read_opts.timestamp = &ts;
std::string value;
Status status = db_->Get(read_opts, kKey, &value);
- if (i < current_ts_low - 1) {
- ASSERT_TRUE(status.IsNotFound());
+ if (i < current_ts_low) {
+ ASSERT_TRUE(status.IsInvalidArgument());
} else {
ASSERT_OK(status);
ASSERT_TRUE(value.compare(Key(i)) == 0);
result_ts_low = cfd->GetFullHistoryTsLow();
ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
- // TODO return a non-ok for read ts < current_ts_low and test it.
for (int i = current_ts_low; i < 20; i++) {
ReadOptions read_opts;
std::string ts_str = Timestamp(i, 0);
value, exclusive, do_validate);
}
} else {
- Status s = db_impl_->FailIfTsSizesMismatch(column_family,
- *(read_options.timestamp));
+ Status s = db_impl_->FailIfTsMismatchCf(
+ column_family, *(read_options.timestamp), /*ts_for_read=*/true);
if (!s.ok()) {
return s;
}