}
last_log_ts = 0;
+ LogFlush(options_.info_log);
}
DBImpl::~DBImpl() {
if (mem_ != nullptr) mem_->Unref();
imm_.UnrefAll();
+ LogFlush(options_.info_log);
}
// Do not flush and close database elegantly. Simulate a crash.
bg_compaction_scheduled_ += LargeNumber;
mutex_.Unlock();
+ LogFlush(options_.info_log);
// force release the lock file.
if (db_lock_ != nullptr) {
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, true);
+ LogFlush(options_.info_log);
mutex_.Lock();
}
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, enable_compression);
+ LogFlush(options_.info_log);
mutex_.Lock();
}
base->Unref();
}
mutex_.Unlock();
DeleteLogFile(log_num);
+ LogFlush(options_.info_log);
mutex_.Lock();
}
}
if (reduce_level) {
ReFitLevel(max_level_with_files, target_level);
}
+ LogFlush(options_.info_log);
}
// return the same level if it cannot be moved
Log(options_.info_log, "Waiting after background flush error: %s",
s.ToString().c_str());
mutex_.Unlock();
+ LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
Log(options_.info_log, "Waiting after background compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
+ LogFlush(options_.info_log);
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
mutex_.Unlock();
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
+ LogFlush(options_.info_log);
mutex_.Lock();
}
compact->builder.reset(
GetTableBuilder(options_, compact->outfile.get(), compression_type));
}
+ LogFlush(options_.info_log);
return s;
}
// TODO: remove memtable flush from normal compaction work
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
+ LogFlush(options_.info_log);
mutex_.Lock();
if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
FlushMemTableToOutputFile();
stats.bytes_written += compact->outputs[i].file_size;
}
+ LogFlush(options_.info_log);
mutex_.Lock();
stats_[compact->compaction->output_level()].Add(stats);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
mutex_.Unlock();
+ LogFlush(options_.info_log);
return internal_iter;
}
current->Unref();
mutex_.Unlock();
+ LogFlush(options_.info_log);
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics, NUMBER_KEYS_READ);
RecordTick(options_.statistics, BYTES_READ, value->size());
current->Unref();
mutex_.Unlock();
+ LogFlush(options_.info_log);
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
}
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence);
}
+ LogFlush(options_.info_log);
mutex_.Lock();
if (status.ok()) {
versions_->SetLastSequence(last_sequence);
FindObsoleteFiles(deletion_state);
}
} // lock released here
+ LogFlush(options_.info_log);
if (status.ok()) {
// remove files outside the db-lock
virtual size_t GetLogFileSize() const {
return DO_NOT_SUPPORT_GET_LOG_FILE_SIZE;
}
+ // Flush to the OS buffers
+ virtual void Flush() {}
private:
// No copying allowed
void operator=(const FileLock&);
};
+
+extern void LogFlush(const shared_ptr<Logger>& info_log);
+
// Log the specified data to *info_log if info_log is non-nullptr.
extern void Log(const shared_ptr<Logger>& info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
# endif
;
+extern void LogFlush(Logger *info_log);
+
extern void Log(Logger* info_log, const char* format, ...)
# if defined(__GNUC__) || defined(__clang__)
__attribute__((__format__ (__printf__, 2, 3)))
uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
int fd_;
- const static uint64_t flush_every_seconds_ = 0;
- uint64_t last_flush_micros_;
+ const static uint64_t flush_every_seconds_ = 5;
+ std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
public:
PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env) :
virtual ~PosixLogger() {
fclose(file_);
}
+ virtual void Flush() {
+ fflush(file_);
+ last_flush_micros_ = env_->NowMicros();
+ }
virtual void Logv(const char* format, va_list ap) {
const uint64_t thread_id = (*gettid_)();
size_t sz = fwrite(base, 1, write_size, file_);
assert(sz == write_size);
if (sz > 0) {
- if (env_->NowMicros() - last_flush_micros_ >=
- flush_every_seconds_ * 1000000) {
- fflush(file_);
- last_flush_micros_ = env_->NowMicros();
- }
log_size_ += write_size;
}
+ uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
+ now_tv.tv_usec;
+ if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+ fflush(file_);
+ last_flush_micros_ = now_micros;
+ }
if (base != buffer) {
delete[] base;
}