if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
+ // If writer is stopped, we need to get it going,
+ // so schedule flushes/compactions
+ if (context.schedule_bg_work_) {
+ MaybeScheduleFlushOrCompaction();
+ }
status = DelayWrite(expiration_time);
}
ASSERT_OK(db_->CompactRange(nullptr, nullptr));
}
+// Github issue #595
+// Large write batch with column families
+TEST_F(DBTest, LargeBatchWithColumnFamilies) {
+ Options options;
+ options.env = env_;
+ options = CurrentOptions(options);
+ options.write_buffer_size = 100000; // Small write buffer
+ CreateAndReopenWithCF({"pikachu"}, options);
+ int64_t j = 0;
+ for (int i = 0; i < 5; i++) {
+ for (int pass = 1; pass <= 3; pass++) {
+ WriteBatch batch;
+ size_t write_size = 1024 * 1024 * (5 + i);
+ fprintf(stderr, "prepare: %ld MB, pass:%d\n", (write_size / 1024 / 1024),
+ pass);
+ for (;;) {
+ std::string data(3000, j++ % 127 + 20);
+ data += std::to_string(j);
+ batch.Put(handles_[0], Slice(data), Slice(data));
+ if (batch.GetDataSize() > write_size) {
+ break;
+ }
+ }
+ fprintf(stderr, "write: %ld MB\n", (batch.GetDataSize() / 1024 / 1024));
+ ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
+ fprintf(stderr, "done\n");
+ }
+ }
+ // make sure we can re-open it.
+ ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
+}
+
} // namespace rocksdb
int main(int argc, char** argv) {