MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
- "Flushing all column families. Write buffer is using %" PRIu64
- " bytes out of a total of %" PRIu64 ".",
+ "Flushing column family with largest mem table size. Write buffer is "
+ "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
write_buffer_.memory_usage(), write_buffer_.buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
+ ColumnFamilyData* largest_cfd = nullptr;
+ size_t largest_cfd_size = 0;
+
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
- status = SwitchMemtable(cfd, &context);
- if (!status.ok()) {
- break;
+ // We only consider active mem table, hoping immutable memtable is
+ // already in the process of flushing.
+ size_t cfd_size = cfd->mem()->ApproximateMemoryUsage();
+ if (largest_cfd == nullptr || cfd_size > largest_cfd_size) {
+ largest_cfd = cfd;
+ largest_cfd_size = cfd_size;
}
- cfd->imm()->FlushRequested();
- SchedulePendingFlush(cfd);
}
}
- MaybeScheduleFlushOrCompaction();
+ if (largest_cfd != nullptr) {
+ status = SwitchMemtable(largest_cfd, &context);
+ if (status.ok()) {
+ largest_cfd->imm()->FlushRequested();
+ SchedulePendingFlush(largest_cfd);
+ MaybeScheduleFlushOrCompaction();
+ }
+ }
}
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
- // Trigger a flush on every CF
+ // Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(1));
+ static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
+ }
+
+ // "dobrynia": 20KB
+ // Flush 'dobrynia'
+ ASSERT_OK(Put(3, Key(2), DummyString(40000)));
+ ASSERT_OK(Put(2, Key(2), DummyString(70000)));
+ ASSERT_OK(Put(0, Key(1), DummyString(1)));
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
- // Flush 'dobrynia' and 'nikitich'
- ASSERT_OK(Put(2, Key(2), DummyString(50000)));
- ASSERT_OK(Put(3, Key(2), DummyString(40000)));
- ASSERT_OK(Put(2, Key(3), DummyString(20000)));
+ // "nikitich" still has has data of 80KB
+ // Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
+ ASSERT_OK(Put(2, Key(2), DummyString(40000)));
+ ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(1));
+ static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(1));
+ static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(2));
+ static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
- // Make 'dobrynia' and 'nikitich' both take up 40% of space
- // When 'pikachu' puts us over 100%, all 3 flush.
- ASSERT_OK(Put(2, Key(2), DummyString(40000)));
+ // "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
+ ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
+ dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+ // This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(1));
+ static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
+ static_cast<uint64_t>(1));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
+ }
+
+ // "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
+ ASSERT_OK(Put(1, Key(2), DummyString(40000)));
+ ASSERT_OK(Put(0, Key(1), DummyString(1)));
+ dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+ dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
+ // This should triggers flush of "pikachu"
+ {
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
+ static_cast<uint64_t>(0));
+ ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
+ static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(3));
+ static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(3));
+ static_cast<uint64_t>(2));
}
- // Some remaining writes so 'default' and 'nikitich' flush on closure.
+ // "default": 10KB, "dobrynia": 40KB
+ // Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
+ // closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
- static_cast<uint64_t>(2));
+ static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
- static_cast<uint64_t>(2));
+ static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
- static_cast<uint64_t>(3));
+ static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
- static_cast<uint64_t>(4));
+ static_cast<uint64_t>(3));
}
}
#endif // ROCKSDB_LITE