]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Blob DB: Fix race condition between flush and write
authorYi Wu <yiwu@fb.com>
Thu, 9 Nov 2017 03:33:12 +0000 (19:33 -0800)
committerYi Wu <yiwu@fb.com>
Thu, 9 Nov 2017 05:28:02 +0000 (21:28 -0800)
Summary:
A race condition will happen when:
* a user thread writes a value, but it hits the write stop condition because there are too many un-flushed memtables, while holding blob_db_impl.write_mutex_.
* Flush is triggered and call flush begin listener and try to acquire blob_db_impl.write_mutex_.

Fixing it.
Closes https://github.com/facebook/rocksdb/pull/3149

Differential Revision: D6279805

Pulled By: yiwu-arbug

fbshipit-source-id: 0e3c58afb78795ebe3360a2c69e05651e3908c40

utilities/blob_db/blob_db_impl.cc

index 319b0f294034f77cc6793099c221ad5b5d6b7dc2..1b13513922a60b7f0571acaf10ed597732ea3a47 100644 (file)
@@ -745,13 +745,22 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
 };
 
 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
-  MutexLock l(&write_mutex_);
 
   uint32_t default_cf_id =
       reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+  // TODO(yiwu): In case there are multiple writers the latest sequence would
+  // not be the actually sequence we are writting. Need to get the sequence
+  // from write batch after DB write instead.
   SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
+  Status s;
   BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
-  Status s = updates->Iterate(&blob_inserter);
+  {
+    // Release write_mutex_ before DB write to avoid race condition with
+    // flush begin listener, which also require write_mutex_ to sync
+    // blob files.
+    MutexLock l(&write_mutex_);
+    s = updates->Iterate(&blob_inserter);
+  }
   if (!s.ok()) {
     return s;
   }
@@ -759,7 +768,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
   if (!s.ok()) {
     return s;
   }
-  assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
 
   // add deleted key to list of keys that have been deleted for book-keeping
   class DeleteBookkeeper : public WriteBatch::Handler {
@@ -849,10 +857,19 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
                             const Slice& value, uint64_t expiration) {
   TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
-  MutexLock l(&write_mutex_);
-  SequenceNumber sequence = GetLatestSequenceNumber() + 1;
+  Status s;
   WriteBatch batch;
-  Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
+  {
+    // Release write_mutex_ before DB write to avoid race condition with
+    // flush begin listener, which also require write_mutex_ to sync
+    // blob files.
+    MutexLock l(&write_mutex_);
+    // TODO(yiwu): In case there are multiple writers the latest sequence would
+    // not be the actually sequence we are writting. Need to get the sequence
+    // from write batch after DB write instead.
+    SequenceNumber sequence = GetLatestSequenceNumber() + 1;
+    s = PutBlobValue(options, key, value, expiration, sequence, &batch);
+  }
   if (s.ok()) {
     s = db_->Write(options, &batch);
   }
@@ -1198,8 +1215,6 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
     return Status::Corruption("Corruption. Blob CRC mismatch");
   }
 
-  // TODO(yiwu): Should use compression flag in the blob file instead of
-  // current compression option.
   if (bfile->compression() != kNoCompression) {
     BlockContents contents;
     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());