Open();
CreateColumnFamiliesAndReopen({"one", "two"});
WriteBatch batch;
+ batch.Put(handles_[0], Slice("existing"), Slice("column-family"));
batch.Put(handles_[1], Slice("non-existing"), Slice("column-family"));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
DropColumnFamilies({1});
+ WriteOptions woptions_ignore_missing_cf;
+ woptions_ignore_missing_cf.ignore_missing_column_families = true;
+ batch.Put(handles_[0], Slice("still here"), Slice("column-family"));
+ ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch));
+ ASSERT_EQ("column-family", Get(0, "still here"));
Status s = db_->Write(WriteOptions(), &batch);
ASSERT_TRUE(s.IsInvalidArgument());
Close();
WriteBatch batch;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
- reporter.Corruption(
- record.size(), Status::Corruption("log record too small"));
+ reporter.Corruption(record.size(),
+ Status::Corruption("log record too small"));
continue;
}
WriteBatchInternal::SetContents(&batch, record);
+ // If column family was not found, it might mean that the WAL write
+ // batch references to the column family that was dropped after the
+ // insert. We don't want to fail the whole write batch in that case -- we
+ // just ignore the update. That's why we set ignore missing column families
+ // to true
status = WriteBatchInternal::InsertInto(
- &batch, column_family_memtables_.get(), true, log_number);
+ &batch, column_family_memtables_.get(),
+ true /* ignore missing column families */, log_number);
MaybeIgnoreError(&status);
if (!status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
status = WriteBatchInternal::InsertInto(
- updates, column_family_memtables_.get(), false, 0, this, false);
+ updates, column_family_memtables_.get(),
+ options.ignore_missing_column_families, 0, this, false);
// A non-OK status here indicates iteration failure (either in-memory
// writebatch corruption (very bad), or the client specified invalid
// column family). This will later on trigger bg_error_.
public:
SequenceNumber sequence_;
ColumnFamilyMemTables* cf_mems_;
- bool recovery_;
+ bool ignore_missing_column_families_;
uint64_t log_number_;
DBImpl* db_;
const bool dont_filter_deletes_;
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
- bool recovery, uint64_t log_number, DB* db,
- const bool dont_filter_deletes)
+ bool ignore_missing_column_families, uint64_t log_number,
+ DB* db, const bool dont_filter_deletes)
: sequence_(sequence),
cf_mems_(cf_mems),
- recovery_(recovery),
+ ignore_missing_column_families_(ignore_missing_column_families),
log_number_(log_number),
db_(reinterpret_cast<DBImpl*>(db)),
dont_filter_deletes_(dont_filter_deletes) {
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
bool found = cf_mems_->Seek(column_family_id);
- if (recovery_ && (!found || log_number_ < cf_mems_->GetLogNumber())) {
- // if in recovery envoronment:
- // * If column family was not found, it might mean that the WAL write
- // batch references to the column family that was dropped after the
- // insert. We don't want to fail the whole write batch in that case -- we
- // just ignore the update.
+ if (!found) {
+ if (ignore_missing_column_families_) {
+ *s = Status::OK();
+ } else {
+ *s = Status::InvalidArgument(
+ "Invalid column family specified in write batch");
+ }
+ return false;
+ }
+ if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) {
+ // This is true only in recovery environment (log_number_ is always 0 in
+ // non-recovery, regular write code-path)
// * If log_number_ < cf_mems_->GetLogNumber(), this means that column
// family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the
*s = Status::OK();
return false;
}
- if (!found) {
- assert(!recovery_);
- // If the column family was not found in non-recovery enviornment
- // (client's write code-path), we have to fail the write and return
- // the failure status to the client.
- *s = Status::InvalidArgument(
- "Invalid column family specified in write batch");
- return false;
- }
return true;
}
-
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
Status seek_status;
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
ColumnFamilyMemTables* memtables,
- bool recovery, uint64_t log_number,
- DB* db, const bool dont_filter_deletes) {
+ bool ignore_missing_column_families,
+ uint64_t log_number, DB* db,
+ const bool dont_filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables,
- recovery, log_number, db, dont_filter_deletes);
+ ignore_missing_column_families, log_number, db,
+ dont_filter_deletes);
return b->Iterate(&inserter);
}
// Inserts batch entries into memtable
// If dont_filter_deletes is false AND options.filter_deletes is true,
// then --> Drops deletes in batch if db->KeyMayExist returns false
- // If recovery == true, this means InsertInto is executed on a recovery
- // code-path. WriteBatch referencing a dropped column family can be
- // found on a recovery code-path and should be ignored (recovery should not
- // fail). Additionally, the memtable will be updated only if
+ // If ignore_missing_column_families == true. WriteBatch referencing
+ // non-existing column family should be ignored.
+ // However, if ignore_missing_column_families == false, any WriteBatch
+ // referencing non-existing column family will return a InvalidArgument()
+ // failure.
+ //
+ // If log_number is non-zero, the memtable will be updated only if
// memtables->GetLogNumber() >= log_number
- // However, if recovery == false, any WriteBatch referencing
- // non-existing column family will return a failure. Also, log_number is
- // ignored in that case
static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
- bool recovery = false, uint64_t log_number = 0,
- DB* db = nullptr,
+ bool ignore_missing_column_families = false,
+ uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true);
static void Append(WriteBatch* dst, const WriteBatch* src);
// Default: 0
uint64_t timeout_hint_us;
- WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {}
+ // If true and if user is trying to write to column families that don't exist
+ // (they were dropped), ignore the write (don't return an error). If there
+ // are multiple writes in a WriteBatch, other writes will succeed.
+ // Default: false
+ bool ignore_missing_column_families;
+
+ WriteOptions()
+ : sync(false),
+ disableWAL(false),
+ timeout_hint_us(0),
+ ignore_missing_column_families(false) {}
};
// Options that control flush operations