]> git-server-git.apps.pok.os.sepia.ceph.com Git - rocksdb.git/commitdiff
Flush the log outside of lock
authorIgor Canadi <icanadi@fb.com>
Thu, 7 Nov 2013 19:31:56 +0000 (11:31 -0800)
committerIgor Canadi <icanadi@fb.com>
Thu, 7 Nov 2013 19:31:56 +0000 (11:31 -0800)
Summary:
Added a new call LogFlush() that flushes the log contents to the OS buffers. We never call it with lock held.

We call it once for every Read/Write and often in compaction/flush process so the frequency should not be a problem.

Test Plan: db_test

Reviewers: dhruba, haobo, kailiu, emayanke

Reviewed By: dhruba

CC: leveldb
Differential Revision: https://reviews.facebook.net/D13935

db/db_impl.cc
db/version_set.cc
include/rocksdb/env.h
util/env.cc
util/posix_logger.h

index 6fd3196983529f8d0d9e8f15cf0d79df16c82104..ac1f84dc356a6764ad2cc9e8e4e88c69a07159c2 100644 (file)
@@ -311,6 +311,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
   }
   last_log_ts = 0;
 
+  LogFlush(options_.info_log);
 }
 
 DBImpl::~DBImpl() {
@@ -333,6 +334,7 @@ DBImpl::~DBImpl() {
 
   if (mem_ != nullptr) mem_->Unref();
   imm_.UnrefAll();
+  LogFlush(options_.info_log);
 }
 
 // Do not flush and close database elegantly. Simulate a crash.
@@ -354,6 +356,7 @@ void DBImpl::TEST_Destroy_DBImpl() {
   bg_compaction_scheduled_ += LargeNumber;
 
   mutex_.Unlock();
+  LogFlush(options_.info_log);
 
   // force release the lock file.
   if (db_lock_ != nullptr) {
@@ -967,6 +970,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
                    table_cache_.get(), iter, &meta,
                    user_comparator(), newest_snapshot,
                    earliest_seqno_in_memtable, true);
+    LogFlush(options_.info_log);
     mutex_.Lock();
   }
 
@@ -1034,6 +1038,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
                    table_cache_.get(), iter, &meta,
                    user_comparator(), newest_snapshot,
                    earliest_seqno_in_memtable, enable_compression);
+    LogFlush(options_.info_log);
     mutex_.Lock();
   }
   base->Unref();
@@ -1153,6 +1158,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress) {
         }
         mutex_.Unlock();
         DeleteLogFile(log_num);
+        LogFlush(options_.info_log);
         mutex_.Lock();
       }
     }
@@ -1180,6 +1186,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end,
   if (reduce_level) {
     ReFitLevel(max_level_with_files, target_level);
   }
+  LogFlush(options_.info_log);
 }
 
 // return the same level if it cannot be moved
@@ -1638,6 +1645,7 @@ void DBImpl::BackgroundCallFlush() {
       Log(options_.info_log, "Waiting after background flush error: %s",
           s.ToString().c_str());
       mutex_.Unlock();
+      LogFlush(options_.info_log);
       env_->SleepForMicroseconds(1000000);
       mutex_.Lock();
     }
@@ -1675,6 +1683,7 @@ void DBImpl::BackgroundCallCompaction() {
       Log(options_.info_log, "Waiting after background compaction error: %s",
           s.ToString().c_str());
       mutex_.Unlock();
+      LogFlush(options_.info_log);
       env_->SleepForMicroseconds(1000000);
       mutex_.Lock();
     }
@@ -1685,6 +1694,7 @@ void DBImpl::BackgroundCallCompaction() {
     mutex_.Unlock();
     PurgeObsoleteFiles(deletion_state);
     EvictObsoleteFiles(deletion_state);
+    LogFlush(options_.info_log);
     mutex_.Lock();
 
   }
@@ -1905,6 +1915,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
     compact->builder.reset(
         GetTableBuilder(options_, compact->outfile.get(), compression_type));
   }
+  LogFlush(options_.info_log);
   return s;
 }
 
@@ -2102,6 +2113,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
     // TODO: remove memtable flush from normal compaction work
     if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
       const uint64_t imm_start = env_->NowMicros();
+      LogFlush(options_.info_log);
       mutex_.Lock();
       if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
         FlushMemTableToOutputFile();
@@ -2395,6 +2407,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
     stats.bytes_written += compact->outputs[i].file_size;
   }
 
+  LogFlush(options_.info_log);
   mutex_.Lock();
   stats_[compact->compaction->output_level()].Add(stats);
 
@@ -2479,6 +2492,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
   internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
 
   mutex_.Unlock();
+  LogFlush(options_.info_log);
   return internal_iter;
 }
 
@@ -2553,6 +2567,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
   current->Unref();
   mutex_.Unlock();
 
+  LogFlush(options_.info_log);
   // Note, tickers are atomic now - no lock protection needed any more.
   RecordTick(options_.statistics, NUMBER_KEYS_READ);
   RecordTick(options_.statistics, BYTES_READ, value->size());
@@ -2631,6 +2646,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
   current->Unref();
   mutex_.Unlock();
 
+  LogFlush(options_.info_log);
   RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
   RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
   RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
@@ -2770,6 +2786,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
         }
         SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence);
       }
+      LogFlush(options_.info_log);
       mutex_.Lock();
       if (status.ok()) {
         versions_->SetLastSequence(last_sequence);
@@ -3358,6 +3375,7 @@ Status DBImpl::DeleteFile(std::string name) {
       FindObsoleteFiles(deletion_state);
     }
   } // lock released here
+  LogFlush(options_.info_log);
 
   if (status.ok()) {
     // remove files outside the db-lock
index 55293ca2e386caf88410da7abcb0bc6f1b98dcc2..1745922bb0176e53b79f94645f198d88ea404697 100644 (file)
@@ -1318,6 +1318,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
     // find offset in manifest file where this version is stored.
     new_manifest_file_size = descriptor_log_->file()->GetFileSize();
 
+    LogFlush(options_->info_log);
     mu->Lock();
     // cache the manifest_file_size so that it can be used to rollover in the
     // next call to LogAndApply
index c9b2befa5fc5742380b85fca99dc66a9cac711ad..824e61e589eef7e528e2a8c89880cc4ee6d229c6 100644 (file)
@@ -497,6 +497,8 @@ class Logger {
   virtual size_t GetLogFileSize() const {
     return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
   }
+  // Flush to the OS buffers
+  virtual void Flush() {}
 
  private:
   // No copying allowed
@@ -516,6 +518,9 @@ class FileLock {
   void operator=(const FileLock&);
 };
 
+
+extern void LogFlush(const shared_ptr<Logger>& info_log);
+
 // Log the specified data to *info_log if info_log is non-nullptr.
 extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
 #   if defined(__GNUC__) || defined(__clang__)
@@ -523,6 +528,8 @@ extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
 #   endif
     ;
 
+extern void LogFlush(Logger *info_log);
+
 extern void Log(Logger* info_log, const char* format, ...)
 #   if defined(__GNUC__) || defined(__clang__)
     __attribute__((__format__ (__printf__, 2, 3)))
index cf62730343438b88d42da1125cd6ffb76d17dae3..bd19d48eb3607f682895f44bf1cc547119ad237a 100644 (file)
@@ -30,6 +30,12 @@ Logger::~Logger() {
 FileLock::~FileLock() {
 }
 
+void LogFlush(Logger *info_log) {
+  if (info_log) {
+    info_log->Flush();
+  }
+}
+
 void Log(Logger* info_log, const char* format, ...) {
   if (info_log) {
     va_list ap;
@@ -39,6 +45,12 @@ void Log(Logger* info_log, const char* format, ...) {
   }
 }
 
+void LogFlush(const shared_ptr<Logger>& info_log) {
+  if (info_log) {
+    info_log->Flush();
+  }
+}
+
 void Log(const shared_ptr<Logger>& info_log, const char* format, ...) {
   if (info_log) {
     va_list ap;
index 482ec3786bccbe7b3ae6f99127a8faae6e1036cb..0a09bd1ebc560e36cf84feae7f94893e23f92231 100644 (file)
@@ -33,8 +33,8 @@ class PosixLogger : public Logger {
   uint64_t (*gettid_)();  // Return the thread id for the current thread
   std::atomic_size_t log_size_;
   int fd_;
-  const static uint64_t flush_every_seconds_ = 0;
-  uint64_t last_flush_micros_;
+  const static uint64_t flush_every_seconds_ = 5;
+  std::atomic_uint_fast64_t last_flush_micros_;
   Env* env_;
  public:
   PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) :
@@ -43,6 +43,10 @@ class PosixLogger : public Logger {
   virtual ~PosixLogger() {
     fclose(file_);
   }
+  virtual void Flush() {
+    fflush(file_);
+    last_flush_micros_ = env_->NowMicros();
+  }
   virtual void Logv(const char* format, va_list ap) {
     const uint64_t thread_id = (*gettid_)();
 
@@ -122,13 +126,14 @@ class PosixLogger : public Logger {
       size_t sz = fwrite(base, 1, write_size, file_);
       assert(sz == write_size);
       if (sz > 0) {
-        if (env_->NowMicros() - last_flush_micros_ >=
-            flush_every_seconds_ * 1000000) {
-          fflush(file_);
-          last_flush_micros_ = env_->NowMicros();
-        }
         log_size_ += write_size;
       }
+      uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
+        now_tv.tv_usec;
+      if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+        fflush(file_);
+        last_flush_micros_ = now_micros;
+      }
       if (base != buffer) {
         delete[] base;
       }