};
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
- MutexLock l(&write_mutex_);
uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ // TODO(yiwu): In case there are multiple writers the latest sequence would
+ // not be the actually sequence we are writting. Need to get the sequence
+ // from write batch after DB write instead.
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
+ Status s;
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
- Status s = updates->Iterate(&blob_inserter);
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ s = updates->Iterate(&blob_inserter);
+ }
if (!s.ok()) {
return s;
}
if (!s.ok()) {
return s;
}
- assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
// add deleted key to list of keys that have been deleted for book-keeping
class DeleteBookkeeper : public WriteBatch::Handler {
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) {
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
- MutexLock l(&write_mutex_);
- SequenceNumber sequence = GetLatestSequenceNumber() + 1;
+ Status s;
WriteBatch batch;
- Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ // TODO(yiwu): In case there are multiple writers the latest sequence would
+ // not be the actually sequence we are writting. Need to get the sequence
+ // from write batch after DB write instead.
+ SequenceNumber sequence = GetLatestSequenceNumber() + 1;
+ s = PutBlobValue(options, key, value, expiration, sequence, &batch);
+ }
if (s.ok()) {
s = db_->Write(options, &batch);
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
- // TODO(yiwu): Should use compression flag in the blob file instead of
- // current compression option.
if (bfile->compression() != kNoCompression) {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());