void rocksdb_options_set_purge_redundant_kvs_while_flush(rocksdb_options_t* opt,
unsigned char v) {}
-void rocksdb_options_set_allow_os_buffer(rocksdb_options_t* opt,
- unsigned char v) {
- opt->rep.allow_os_buffer = v;
+void rocksdb_options_set_use_direct_reads(rocksdb_options_t* opt,
+ unsigned char v) {
+ opt->rep.use_direct_reads = v;
+}
+
+void rocksdb_options_set_use_direct_writes(rocksdb_options_t* opt,
+ unsigned char v) {
+ opt->rep.use_direct_writes = v;
}
void rocksdb_options_set_allow_mmap_reads(
void rocksdb_ratelimiter_destroy(rocksdb_ratelimiter_t *limiter) {
if (limiter->rep) {
- delete limiter->rep;
+ delete limiter->rep;
}
delete limiter;
}
"More than four DB paths are not supported yet. ");
}
- if (db_options.allow_mmap_reads && !db_options.allow_os_buffer) {
+ if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
// Protect against assert in PosixMMapReadableFile constructor
return Status::NotSupported(
"If memory mapped reads (allow_mmap_reads) are enabled "
- "then os caching (allow_os_buffer) must also be enabled. ");
+ "then direct I/O reads (use_direct_reads) must be disabled. ");
+ }
+
+ if (db_options.allow_mmap_writes && db_options.use_direct_writes) {
+ return Status::NotSupported(
+ "If memory mapped writes (allow_mmap_writes) are enabled "
+ "then direct I/O writes (use_direct_writes) must be disabled. ");
}
return Status::OK();
TEST_F(DBTest, MmapAndBufferOptions) {
Options options = CurrentOptions();
- options.allow_os_buffer = false;
+ options.use_direct_reads = true;
options.allow_mmap_reads = true;
ASSERT_NOK(TryReopen(options));
// All other combinations are acceptable
- options.allow_os_buffer = true;
+ options.use_direct_reads = false;
ASSERT_OK(TryReopen(options));
- options.allow_os_buffer = false;
- options.allow_mmap_reads = false;
- ASSERT_OK(TryReopen(options));
+ if (IsDirectIOSupported()) {
+ options.use_direct_reads = true;
+ options.allow_mmap_reads = false;
+ ASSERT_OK(TryReopen(options));
+ }
- options.allow_os_buffer = true;
+ options.use_direct_reads = false;
ASSERT_OK(TryReopen(options));
}
#endif
return DB::Open(options, dbname_, &db_);
}
+bool DBTestBase::IsDirectIOSupported() {
+ EnvOptions env_options;
+ env_options.use_mmap_writes = false;
+ env_options.use_direct_writes = true;
+ std::string tmp = TempFileName(dbname_, 999);
+ Status s;
+ {
+ unique_ptr<WritableFile> file;
+ s = env_->NewWritableFile(tmp, &file, env_options);
+ }
+ if (s.ok()) {
+ s = env_->DeleteFile(tmp);
+ }
+ return s.ok();
+}
+
Status DBTestBase::Flush(int cf) {
if (cf == 0) {
return db_->Flush(FlushOptions());
Status TryReopen(const Options& options);
+ bool IsDirectIOSupported();
+
Status Flush(int cf = 0);
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions());
options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 99999;
options.level0_stop_writes_trigger = 99999;
- options.allow_os_buffer = false;
+ options.use_direct_writes = true;
options.write_buffer_size = FLAGS_memtable_size;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);
error_if_exists=false
recycle_log_file_num=0
skip_log_error_on_recovery=false
- allow_mmap_reads=false
- allow_os_buffer=true
db_log_dir=
new_table_reader_for_compaction_inputs=true
+ allow_mmap_reads=false
allow_mmap_writes=false
-
+ use_direct_reads=false
+ use_direct_writes=false
+
[CFOptions "default"]
compaction_style=kCompactionStyleLevel
write_buffer_size=134217728
disable_auto_compactions=false
inplace_update_support=false
-
+
[TableOptions/BlockBasedTable "default"]
format_version=2
whole_key_filtering=true
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_purge_redundant_kvs_while_flush(rocksdb_options_t*,
unsigned char);
-extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_os_buffer(
- rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_reads(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_writes(
rocksdb_options_t*, unsigned char);
+extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_reads(
+ rocksdb_options_t*, unsigned char);
+extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_writes(
+ rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_skip_log_error_on_recovery(
// construct from Options
explicit EnvOptions(const DBOptions& options);
- // If true, then allow caching of data in environment buffers
- bool use_os_buffer = true;
-
// If true, then use mmap to read data
bool use_mmap_reads = false;
// OptimizeForManifestWrite will create a new EnvOptions object that is a copy
// of the EnvOptions in the parameters, but is optimized for writing manifest
// files. Default implementation returns the copy of the same object.
- virtual EnvOptions OptimizeForManifestWrite(const EnvOptions& env_options)
- const;
+ virtual EnvOptions OptimizeForManifestWrite(
+ const EnvOptions& env_options) const;
// Returns the status of all threads that belong to the current Env.
virtual Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
}
virtual ~WritableFile();
- // Indicates if the class makes use of unbuffered I/O
- // If false you must pass aligned buffer to Write()
- virtual bool UseOSBuffer() const {
- return true;
- }
+ // Indicates if the class makes use of direct IO
+ // If true you must pass aligned buffer to Write()
+ virtual bool UseDirectIO() const { return false; }
const size_t c_DefaultPageSize = 4 * 1024;
// Use the returned alignment value to allocate
- // aligned buffer for Write() when UseOSBuffer()
- // returns false
+ // aligned buffer for Write() when UseDirectIO()
+ // returns true
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
// the sector. The implementation thus needs to also rewrite the last
// partial sector.
// Note: PositionAppend does not guarantee moving the file offset after the
- // write. A WriteabelFile object must support either Append or
+ // write. A WritableFile object must support either Append or
// PositionedAppend, so the users cannot mix the two.
//
// PositionedAppend() can only happen on the page/sector boundaries. For that
return false;
}
- // Indicates the upper layers if the current WritableFile implementation
- // uses direct IO.
- virtual bool UseDirectIO() const { return false; }
-
/*
* Change the priority in rate limiter if rate limiting is enabled.
* If rate limiting is not enabled, this call has no effect.
RandomRWFile() {}
virtual ~RandomRWFile() {}
- // Indicates if the class makes use of unbuffered I/O
+ // Indicates if the class makes use of direct I/O
// If false you must pass aligned buffer to Write()
- virtual bool UseOSBuffer() const {
- return true;
- }
+ virtual bool UseDirectIO() const { return false; }
const size_t c_DefaultPageSize = 4 * 1024;
- // Use the returned alignment value to allocate
- // aligned buffer for Write() when UseOSBuffer()
- // returns false
+ // Use the returned alignment value to allocate aligned
+ // buffer for Write() when UseDirectIO() returns true
virtual size_t GetRequiredBufferAlignment() const {
return c_DefaultPageSize;
}
virtual void EnableReadAhead() {}
// Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
- // Pass aligned buffer when UseOSBuffer() returns false.
+ // Pass aligned buffer when UseDirectIO() returns true.
virtual Status Write(uint64_t offset, const Slice& data) = 0;
// Read up to `n` bytes starting from offset `offset` and store them in
// large amounts of data (such as xfs's allocsize option).
size_t manifest_preallocation_size;
- // Hint the OS that it should not buffer disk I/O. Enabling this
- // parameter may improve performance but increases pressure on the
- // system cache.
- //
- // The exact behavior of this parameter is platform dependent.
- //
- // On POSIX systems, after RocksDB reads data from disk it will
- // mark the pages as "unneeded". The operating system may - or may not
- // - evict these pages from memory, reducing pressure on the system
- // cache. If the disk block is requested again this can result in
- // additional disk I/O.
- //
- // On WINDOWS system, files will be opened in "unbuffered I/O" mode
- // which means that data read from the disk will not be cached or
- // bufferized. The hardware buffer of the devices may however still
- // be used. Memory mapped files are not impacted by this parameter.
- //
- // Default: true
- bool allow_os_buffer;
-
// Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads;
// Default: false
bool allow_mmap_writes;
+ // Enable direct I/O mode for read/write
+ // they may or may not improve performance depending on the use case
+ //
+ // Files will be opened in "direct I/O" mode
+ // which means that data r/w from the disk will not be cached or
+ // bufferized. The hardware buffer of the devices may however still
+ // be used. Memory mapped files are not impacted by these parameters.
+
// Use O_DIRECT for reading file
// Default: false
bool use_direct_reads;
+ // Use O_DIRECT for writing file
+ // Default: false
+ bool use_direct_writes;
+
// If false, fallocate() calls are bypassed
bool allow_fallocate;
// a SstTable. Instead, buffer in WritableFileWriter will take
// care of the flushing when it is full.
//
- // On Windows, this option helps a lot when unbuffered I/O
- // (allow_os_buffer = false) is used, since it avoids small
- // unbuffered disk write.
+ // This option helps a lot when direct I/O writes
+ // (use_direct_writes = true) is used, since it avoids small
+ // direct disk write.
//
// User may also adjust writable_file_max_buffer_size to optimize disk I/O
// size.
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under the BSD-style license found in the
+// LICENSE file in the root directory of this source tree. An additional grant
+// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_UTILITIES_ENV_LIBRADOS_H
#define ROCKSDB_UTILITIES_ENV_LIBRADOS_H
class LibradosWritableFile;
class EnvLibrados : public EnvWrapper {
-public:
+ public:
// Create a brand new sequentially-readable file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores nullptr in *result and returns non-OK. If the file does
// not exist, returns a non-OK status.
//
// The returned file will only be accessed by one thread at a time.
- Status NewSequentialFile(
- const std::string& fname,
- std::unique_ptr<SequentialFile>* result,
- const EnvOptions& options);
+ Status NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& options) override;
// Create a brand new random access read-only file with the
// specified name. On success, stores a pointer to the new file in
// status.
//
// The returned file may be concurrently accessed by multiple threads.
- Status NewRandomAccessFile(
- const std::string& fname,
- std::unique_ptr<RandomAccessFile>* result,
- const EnvOptions& options);
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& options) override;
// Create an object that writes to a new file with the specified
// name. Deletes any existing file with the same name and creates a
// returns non-OK.
//
// The returned file will only be accessed by one thread at a time.
- Status NewWritableFile(
- const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options);
+ Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override;
// Reuse an existing file by renaming it and opening it as writable.
- Status ReuseWritableFile(
- const std::string& fname,
- const std::string& old_fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options);
+ Status ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override;
// Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory
// On success, stores a pointer to the new Directory in
// *result and returns OK. On failure stores nullptr in *result and
// returns non-OK.
- Status NewDirectory(
- const std::string& name,
- std::unique_ptr<Directory>* result);
+ Status NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) override;
// Returns OK if the named file exists.
// NotFound if the named file does not exist,
// the calling process does not have permission to determine
// whether this file exists, or if the path is invalid.
// IOError if an IO Error was encountered
- Status FileExists(const std::string& fname);
+ Status FileExists(const std::string& fname) overrdie;
// Store in *result the names of the children of the specified directory.
// The names are relative to "dir".
// Original contents of *results are dropped.
- Status GetChildren(const std::string& dir,
- std::vector<std::string>* result);
+ Status GetChildren(const std::string& dir, std::vector<std::string>* result);
// Delete the named file.
- Status DeleteFile(const std::string& fname);
+ Status DeleteFile(const std::string& fname) override;
// Create the specified directory. Returns error if directory exists.
- Status CreateDir(const std::string& dirname);
+ Status CreateDir(const std::string& dirname) override;
// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
- Status CreateDirIfMissing(const std::string& dirname);
+ Status CreateDirIfMissing(const std::string& dirname) override;
// Delete the specified directory.
- Status DeleteDir(const std::string& dirname);
+ Status DeleteDir(const std::string& dirname) override;
// Store the size of fname in *file_size.
- Status GetFileSize(const std::string& fname, uint64_t* file_size);
+ Status GetFileSize(const std::string& fname, uint64_t* file_size) override;
// Store the last modification time of fname in *file_mtime.
Status GetFileModificationTime(const std::string& fname,
- uint64_t* file_mtime);
+ uint64_t* file_mtime) override;
// Rename file src to target.
- Status RenameFile(const std::string& src,
- const std::string& target);
+ Status RenameFile(const std::string& src, const std::string& target) override;
// Hard Link file src to target.
- Status LinkFile(const std::string& src, const std::string& target);
+ Status LinkFile(const std::string& src, const std::string& target) override;
// Lock the specified file. Used to prevent concurrent access to
// the same db by multiple processes. On failure, stores nullptr in
Status UnlockFile(FileLock* lock);
// Get full directory name for this db.
- Status GetAbsolutePath(const std::string& db_path,
- std::string* output_path);
+ Status GetAbsolutePath(const std::string& db_path, std::string* output_path);
// Generate unique id
std::string GenerateUniqueId();
const std::string& config_path,
const std::string& db_pool);
- explicit EnvLibrados(const std::string& client_name, // first 3 parameters are for RADOS client init
- const std::string& cluster_name,
- const uint64_t flags,
- const std::string& db_name,
- const std::string& config_path,
- const std::string& db_pool,
- const std::string& wal_dir,
- const std::string& wal_pool,
- const uint64_t write_buffer_size);
- ~EnvLibrados() {
- _rados.shutdown();
- }
-private:
+ explicit EnvLibrados(
+ const std::string& client_name, // first 3 parameters are
+ // for RADOS client init
+ const std::string& cluster_name, const uint64_t flags,
+ const std::string& db_name, const std::string& config_path,
+ const std::string& db_pool, const std::string& wal_dir,
+ const std::string& wal_pool, const uint64_t write_buffer_size);
+ ~EnvLibrados() { _rados.shutdown(); }
+
+ private:
std::string _client_name;
std::string _cluster_name;
uint64_t _flags;
- std::string _db_name; // get from user, readable string; Also used as db_id for db metadata
+ std::string _db_name; // get from user, readable string; Also used as db_id
+ // for db metadata
std::string _config_path;
- librados::Rados _rados; // RADOS client
+ librados::Rados _rados; // RADOS client
std::string _db_pool_name;
- librados::IoCtx _db_pool_ioctx; // IoCtx for connecting db_pool
- std::string _wal_dir; // WAL dir path
+ librados::IoCtx _db_pool_ioctx; // IoCtx for connecting db_pool
+ std::string _wal_dir; // WAL dir path
std::string _wal_pool_name;
- librados::IoCtx _wal_pool_ioctx; // IoCtx for connecting wal_pool
- uint64_t _write_buffer_size; // WritableFile buffer max size
+ librados::IoCtx _wal_pool_ioctx; // IoCtx for connecting wal_pool
+ uint64_t _write_buffer_size; // WritableFile buffer max size
/* private function to communicate with rados */
std::string _CreateFid();
Status _RenameFid(const std::string& old_fname, const std::string& new_fname);
Status _AddFid(const std::string& fname, const std::string& fid);
Status _DelFid(const std::string& fname);
- Status _GetSubFnames(
- const std::string& dirname,
- std::vector<std::string> * result
- );
+ Status _GetSubFnames(const std::string& dirname,
+ std::vector<std::string>* result);
librados::IoCtx* _GetIoctx(const std::string& prefix);
friend class LibradosWritableFile;
};
// semantics and behavior are correct (in that they match that of an
// existing, stable Env, like the default POSIX one).
-#ifndef ROCKSDB_LITE
+#pragma once
-#ifndef STORAGE_ROCKSDB_INCLUDE_UTILITIES_ENVMIRROR_H_
-#define STORAGE_ROCKSDB_INCLUDE_UTLIITIES_ENVMIRROR_H_
+#ifndef ROCKSDB_LITE
#include <iostream>
#include <algorithm>
} // namespace rocksdb
-#endif // STORAGE_ROCKSDB_INCLUDE_UTILITIES_ENVMIRROR_H_
-
#endif // ROCKSDB_LITE
}
},
/* TODO(yhchiang): enable the following
- bufferedio(rocksdb::EnvOptions().use_os_buffer,
- "Allow buffered io using OS buffers.") {
+ direct_reads(rocksdb::EnvOptions().use_direct_reads,
+ "Allow direct I/O reads.") {
@Override public Object parseValue(String value) {
return parseBoolean(value);
}
- },
+ },
+ direct_writes(rocksdb::EnvOptions().use_direct_reads,
+ "Allow direct I/O reads.") {
+ @Override public Object parseValue(String value) {
+ return parseBoolean(value);
+ }
+ },
*/
mmap_read(false,
"Allow reads to occur via mmap-ing files.") {
/*
* Class: org_rocksdb_EnvOptions
- * Method: setUseOsBuffer
+ * Method: setUseDirectReads
* Signature: (JZ)V
*/
-void Java_org_rocksdb_EnvOptions_setUseOsBuffer(JNIEnv *env, jobject jobj,
- jlong jhandle,
- jboolean use_os_buffer) {
- ENV_OPTIONS_SET_BOOL(jhandle, use_os_buffer);
+void Java_org_rocksdb_EnvOptions_setUseDirectReads(JNIEnv *env, jobject jobj,
+ jlong jhandle,
+ jboolean use_direct_reads) {
+ ENV_OPTIONS_SET_BOOL(jhandle, use_direct_reads);
}
/*
* Class: org_rocksdb_EnvOptions
- * Method: useOsBuffer
+ * Method: useDirectReads
* Signature: (J)Z
*/
-jboolean Java_org_rocksdb_EnvOptions_useOsBuffer(JNIEnv *env, jobject jobj,
- jlong jhandle) {
- return ENV_OPTIONS_GET(jhandle, use_os_buffer);
+jboolean Java_org_rocksdb_EnvOptions_useDirectReads(JNIEnv *env, jobject jobj,
+ jlong jhandle) {
+ return ENV_OPTIONS_GET(jhandle, use_direct_reads);
+}
+
+/*
+ * Class: org_rocksdb_EnvOptions
+ * Method: setUseDirectWrites
+ * Signature: (JZ)V
+ */
+void Java_org_rocksdb_EnvOptions_setUseDirectWrites(
+ JNIEnv *env, jobject jobj, jlong jhandle, jboolean use_direct_writes) {
+ ENV_OPTIONS_SET_BOOL(jhandle, use_direct_writes);
+}
+
+/*
+ * Class: org_rocksdb_EnvOptions
+ * Method: useDirectWrites
+ * Signature: (J)Z
+ */
+jboolean Java_org_rocksdb_EnvOptions_useDirectWrites(JNIEnv *env, jobject jobj,
+ jlong jhandle) {
+ return ENV_OPTIONS_GET(jhandle, use_direct_writes);
}
/*
return ENV_OPTIONS_GET(jhandle, use_mmap_writes);
}
-/*
- * Class: org_rocksdb_EnvOptions
- * Method: setUseDirectReads
- * Signature: (JZ)V
- */
-void Java_org_rocksdb_EnvOptions_setUseDirectReads(JNIEnv *env, jobject jobj,
- jlong jhandle,
- jboolean use_direct_reads) {
- ENV_OPTIONS_SET_BOOL(jhandle, use_direct_reads);
-}
-
-/*
- * Class: org_rocksdb_EnvOptions
- * Method: useDirectReads
- * Signature: (J)Z
- */
-jboolean Java_org_rocksdb_EnvOptions_useDirectReads(JNIEnv *env, jobject jobj,
- jlong jhandle) {
- return ENV_OPTIONS_GET(jhandle, use_direct_reads);
-}
-
-/*
- * Class: org_rocksdb_EnvOptions
- * Method: setUseDirectWrites
- * Signature: (JZ)V
- */
-void Java_org_rocksdb_EnvOptions_setUseDirectWrites(
- JNIEnv *env, jobject jobj, jlong jhandle, jboolean use_direct_writes) {
- ENV_OPTIONS_SET_BOOL(jhandle, use_direct_writes);
-}
-
-/*
- * Class: org_rocksdb_EnvOptions
- * Method: useDirectWrites
- * Signature: (J)Z
- */
-jboolean Java_org_rocksdb_EnvOptions_useDirectWrites(JNIEnv *env, jobject jobj,
- jlong jhandle) {
- return ENV_OPTIONS_GET(jhandle, use_direct_writes);
-}
-
/*
* Class: org_rocksdb_EnvOptions
* Method: setAllowFallocate
}
}
-/*
- * Class: org_rocksdb_Options
- * Method: allowOsBuffer
- * Signature: (J)Z
- */
-jboolean Java_org_rocksdb_Options_allowOsBuffer(
- JNIEnv* env, jobject jobj, jlong jhandle) {
- return reinterpret_cast<rocksdb::Options*>(jhandle)->allow_os_buffer;
-}
-
-/*
- * Class: org_rocksdb_Options
- * Method: setAllowOsBuffer
- * Signature: (JZ)V
- */
-void Java_org_rocksdb_Options_setAllowOsBuffer(
- JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow_os_buffer) {
- reinterpret_cast<rocksdb::Options*>(jhandle)->allow_os_buffer =
- static_cast<bool>(allow_os_buffer);
-}
-
/*
* Method: setTableFactory
* Signature: (JJ)V
static_cast<bool>(allow_mmap_writes);
}
+/*
+ * Class: org_rocksdb_Options
+ * Method: useDirectReads
+ * Signature: (J)Z
+ */
+jboolean Java_org_rocksdb_Options_useDirectReads(JNIEnv* env, jobject jobj,
+ jlong jhandle) {
+ return reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_reads;
+}
+
+/*
+ * Class: org_rocksdb_Options
+ * Method: setUseDirectReads
+ * Signature: (JZ)V
+ */
+void Java_org_rocksdb_Options_setUseDirectReads(JNIEnv* env, jobject jobj,
+ jlong jhandle,
+ jboolean use_direct_reads) {
+ reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_reads =
+ static_cast<bool>(use_direct_reads);
+}
+
+/*
+ * Class: org_rocksdb_Options
+ * Method: useDirectWrites
+ * Signature: (J)Z
+ */
+jboolean Java_org_rocksdb_Options_useDirectWrites(JNIEnv* env, jobject jobj,
+ jlong jhandle) {
+ return reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes;
+}
+
+/*
+ * Class: org_rocksdb_Options
+ * Method: setUseDirectReads
+ * Signature: (JZ)V
+ */
+void Java_org_rocksdb_Options_setUseDirectWrites(JNIEnv* env, jobject jobj,
+ jlong jhandle,
+ jboolean use_direct_writes) {
+ reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes =
+ static_cast<bool>(use_direct_writes);
+}
+
/*
* Class: org_rocksdb_Options
* Method: isFdCloseOnExec
/*
* Class: org_rocksdb_DBOptions
- * Method: setAllowOsBuffer
+ * Method: useDirectReads
+ * Signature: (J)Z
+ */
+jboolean Java_org_rocksdb_DBOptions_useDirectReads(JNIEnv* env, jobject jobj,
+ jlong jhandle) {
+ return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_reads;
+}
+
+/*
+ * Class: org_rocksdb_DBOptions
+ * Method: setUseDirectReads
* Signature: (JZ)V
*/
-void Java_org_rocksdb_DBOptions_setAllowOsBuffer(
- JNIEnv* env, jobject jobj, jlong jhandle, jboolean allow_os_buffer) {
- reinterpret_cast<rocksdb::DBOptions*>(jhandle)->allow_os_buffer =
- static_cast<bool>(allow_os_buffer);
+void Java_org_rocksdb_DBOptions_setUseDirectReads(JNIEnv* env, jobject jobj,
+ jlong jhandle,
+ jboolean use_direct_reads) {
+ reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_reads =
+ static_cast<bool>(use_direct_reads);
}
/*
* Class: org_rocksdb_DBOptions
- * Method: allowOsBuffer
+ * Method: useDirectWrites
* Signature: (J)Z
*/
-jboolean Java_org_rocksdb_DBOptions_allowOsBuffer(
- JNIEnv* env, jobject jobj, jlong jhandle) {
- return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->allow_os_buffer;
+jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj,
+ jlong jhandle) {
+ return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes;
+}
+
+/*
+ * Class: org_rocksdb_DBOptions
+ * Method: setUseDirectReads
+ * Signature: (JZ)V
+ */
+void Java_org_rocksdb_DBOptions_setUseDirectWrites(JNIEnv* env, jobject jobj,
+ jlong jhandle,
+ jboolean use_direct_writes) {
+ reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes =
+ static_cast<bool>(use_direct_writes);
}
/*
}
@Override
- public DBOptions setAllowOsBuffer(
- final boolean allowOsBuffer) {
+ public DBOptions setUseDirectReads(
+ final boolean useDirectReads) {
assert(isOwningHandle());
- setAllowOsBuffer(nativeHandle_, allowOsBuffer);
+ setUseDirectReads(nativeHandle_, useDirectReads);
return this;
}
@Override
- public boolean allowOsBuffer() {
+ public boolean useDirectReads() {
assert(isOwningHandle());
- return allowOsBuffer(nativeHandle_);
+ return useDirectReads(nativeHandle_);
+ }
+
+ @Override
+ public DBOptions setUseDirectWrites(
+ final boolean useDirectWrites) {
+ assert(isOwningHandle());
+ setUseDirectWrites(nativeHandle_, useDirectWrites);
+ return this;
+ }
+
+ @Override
+ public boolean useDirectWrites() {
+ assert(isOwningHandle());
+ return useDirectWrites(nativeHandle_);
}
@Override
private native void setManifestPreallocationSize(
long handle, long size) throws IllegalArgumentException;
private native long manifestPreallocationSize(long handle);
- private native void setAllowOsBuffer(
- long handle, boolean allowOsBuffer);
- private native boolean allowOsBuffer(long handle);
+ private native void setUseDirectReads(long handle, boolean useDirectReads);
+ private native boolean useDirectReads(long handle);
+ private native void setUseDirectWrites(long handle, boolean useDirectWrites);
+ private native boolean useDirectWrites(long handle);
private native void setAllowMmapReads(
long handle, boolean allowMmapReads);
private native boolean allowMmapReads(long handle);
long manifestPreallocationSize();
/**
- * Data being read from file storage may be buffered in the OS
- * Default: true
+ * Enable the OS to use direct I/O for reading sst tables.
+ * Default: false
*
- * @param allowOsBuffer if true, then OS buffering is allowed.
+ * @param useDirectReads if true, then direct read is enabled
* @return the instance of the current Object.
*/
- Object setAllowOsBuffer(boolean allowOsBuffer);
+ Object setUseDirectReads(boolean useDirectReads);
/**
- * Data being read from file storage may be buffered in the OS
- * Default: true
+ * Enable the OS to use direct I/O for reading sst tables.
+ * Default: false
+ *
+ * @return if true, then direct reads are enabled
+ */
+ boolean useDirectReads();
+
+ /**
+ * Enable the OS to use direct I/O for writing sst tables.
+ * Default: false
+ *
+ * @param useDirectWrites if true, then direct write is enabled
+ * @return the instance of the current Object.
+ */
+ Object setUseDirectWrites(boolean useDirectWrites);
+
+ /**
+ * Enable the OS to use direct I/O for writing sst tables.
+ * Default: false
*
- * @return if true, then OS buffering is allowed.
+ * @return if true, then direct writes are enabled
*/
- boolean allowOsBuffer();
+ boolean useDirectWrites();
/**
* Allow the OS to mmap file for reading sst tables.
}
@Override
- public boolean allowOsBuffer() {
+ public Options setUseDirectReads(final boolean useDirectReads) {
assert(isOwningHandle());
- return allowOsBuffer(nativeHandle_);
+ setUseDirectReads(nativeHandle_, useDirectReads);
+ return this;
+ }
+
+ @Override
+ public boolean useDirectReads() {
+ assert(isOwningHandle());
+ return useDirectReads(nativeHandle_);
}
@Override
- public Options setAllowOsBuffer(final boolean allowOsBuffer) {
+ public Options setUseDirectWrites(final boolean useDirectWrites) {
assert(isOwningHandle());
- setAllowOsBuffer(nativeHandle_, allowOsBuffer);
+ setUseDirectWrites(nativeHandle_, useDirectWrites);
return this;
}
+ @Override
+ public boolean useDirectWrites() {
+ assert(isOwningHandle());
+ return useDirectWrites(nativeHandle_);
+ }
+
+
@Override
public boolean allowMmapReads() {
assert(isOwningHandle());
private native void setManifestPreallocationSize(
long handle, long size) throws IllegalArgumentException;
private native long manifestPreallocationSize(long handle);
- private native void setAllowOsBuffer(
- long handle, boolean allowOsBuffer);
- private native boolean allowOsBuffer(long handle);
+ private native void setUseDirectReads(long handle, boolean useDirectReads);
+ private native boolean useDirectReads(long handle);
+ private native void setUseDirectWrites(long handle, boolean useDirectWrites);
+ private native boolean useDirectWrites(long handle);
private native void setAllowMmapReads(
long handle, boolean allowMmapReads);
private native boolean allowMmapReads(long handle);
}
@Test
- public void allowOsBuffer() {
+ public void useDirectReads() {
try(final DBOptions opt = new DBOptions()) {
final boolean boolValue = rand.nextBoolean();
- opt.setAllowOsBuffer(boolValue);
- assertThat(opt.allowOsBuffer()).isEqualTo(boolValue);
+ opt.setUseDirectReads(boolValue);
+ assertThat(opt.useDirectReads()).isEqualTo(boolValue);
+ }
+ }
+
+ @Test
+ public void useDirectWrites() {
+ try(final DBOptions opt = new DBOptions()) {
+ final boolean boolValue = rand.nextBoolean();
+ opt.setUseDirectWrites(boolValue);
+ assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
}
}
public static final Random rand = PlatformRandomHelper.getPlatformSpecificRandomFactory();
- @Test
- public void useOsBuffer() {
- try (final EnvOptions envOptions = new EnvOptions()) {
- final boolean boolValue = rand.nextBoolean();
- envOptions.setUseOsBuffer(boolValue);
- assertThat(envOptions.useOsBuffer()).isEqualTo(boolValue);
- }
- }
-
@Test
public void useMmapReads() {
try (final EnvOptions envOptions = new EnvOptions()) {
}
@Test
- public void allowOsBuffer() {
- try (final Options opt = new Options()) {
+ public void useDirectReads() {
+ try(final Options opt = new Options()) {
+ final boolean boolValue = rand.nextBoolean();
+ opt.setUseDirectReads(boolValue);
+ assertThat(opt.useDirectReads()).isEqualTo(boolValue);
+ }
+ }
+
+ @Test
+ public void useDirectWrites() {
+ try(final Options opt = new Options()) {
final boolean boolValue = rand.nextBoolean();
- opt.setAllowOsBuffer(boolValue);
- assertThat(opt.allowOsBuffer()).isEqualTo(boolValue);
+ opt.setUseDirectWrites(boolValue);
+ assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
}
}
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "port/win/env_win.h"
#include <algorithm>
-#include <thread>
#include <ctime>
+#include <thread>
#include <errno.h>
#include <process.h> // _getpid
#include "port/dirent.h"
#include "port/win/win_logger.h"
#include "port/win/io_win.h"
-#include "port/win/env_win.h"
#include "util/iostats_context_imp.h"
// Random access is to disable read-ahead as the system reads too much data
DWORD fileFlags = FILE_ATTRIBUTE_READONLY;
- if (!options.use_os_buffer && !options.use_mmap_reads) {
+ if (options.use_direct_reads && !options.use_mmap_reads) {
fileFlags |= FILE_FLAG_NO_BUFFERING;
} else {
fileFlags |= FILE_FLAG_RANDOM_ACCESS;
}
Status WinEnvIO::NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options) {
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
const size_t c_BufferCapacity = 64 * 1024;
EnvOptions local_options(options);
DWORD fileFlags = FILE_ATTRIBUTE_NORMAL;
- if (!local_options.use_os_buffer && !local_options.use_mmap_writes) {
+ if (local_options.use_direct_writes && !local_options.use_mmap_writes) {
fileFlags = FILE_FLAG_NO_BUFFERING;
}
DWORD creation_disposition = OPEN_ALWAYS; // Create if necessary or open existing
DWORD file_flags = FILE_FLAG_RANDOM_ACCESS;
- if (!options.use_os_buffer) {
+ if (options.use_direct_reads && options.use_direct_writes) {
file_flags |= FILE_FLAG_NO_BUFFERING;
}
EnvOptions WinEnvIO::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const {
EnvOptions optimized = env_options;
- optimized.use_mmap_writes = false;
optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
- optimized.use_os_buffer =
- true; // This is because we flush only whole pages on unbuffered io and
+ optimized.use_mmap_writes = false;
+ // This is because we flush only whole pages on unbuffered io and
// the last records are not guaranteed to be flushed.
+ optimized.use_direct_writes = false;
// TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
const EnvOptions& env_options) const {
EnvOptions optimized = env_options;
optimized.use_mmap_writes = false;
- optimized.use_os_buffer = true;
+ optimized.use_direct_writes = false;
optimized.fallocate_with_keep_size = true;
return optimized;
}
}
Status WinEnv::NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options) {
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
return winenv_io_.NewWritableFile(fname, result, options);
}
const EnvOptions& options);
virtual Status NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options);
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options);
// The returned file will only be accessed by one thread at a time.
virtual Status NewRandomRWFile(const std::string& fname,
const EnvOptions& options) override;
Status NewWritableFile(const std::string& fname,
- std::unique_ptr<WritableFile>* result,
- const EnvOptions& options) override;
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override;
// The returned file will only be accessed by one thread at a time.
Status NewRandomRWFile(const std::string& fname,
#include "util/sync_point.h"
#include "util/coding.h"
#include "util/iostats_context_imp.h"
-#include "util/sync_point.h"
#include "util/aligned_buffer.h"
////////////////////////////////////////////////////////////////////////////////////////////////////
// WinMmapReadableFile
-WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap,
- const void* mapped_region, size_t length)
- : WinFileData(fileName, hFile, false),
- hMap_(hMap),
- mapped_region_(mapped_region),
- length_(length) {}
+WinMmapReadableFile::WinMmapReadableFile(const std::string& fileName,
+ HANDLE hFile, HANDLE hMap,
+ const void* mapped_region,
+ size_t length)
+ : WinFileData(fileName, hFile, false /* use_direct_io */),
+ hMap_(hMap),
+ mapped_region_(mapped_region),
+ length_(length) {}
WinMmapReadableFile::~WinMmapReadableFile() {
BOOL ret = ::UnmapViewOfFile(mapped_region_);
// WinSequentialFile
WinSequentialFile::WinSequentialFile(const std::string& fname, HANDLE f,
- const EnvOptions& options)
- : WinFileData(fname, f, options.use_os_buffer)
-{}
+ const EnvOptions& options)
+ : WinFileData(fname, f, options.use_direct_reads) {}
WinSequentialFile::~WinSequentialFile() {
assert(hFile_ != INVALID_HANDLE_VALUE);
assert(!options.use_mmap_reads);
- // Unbuffered access, use internal buffer for reads
- if (!file_base_->UseOSBuffer()) {
+ // Direct access, use internal buffer for reads
+ if (file_base_->UseDirectIO()) {
// Do not allocate the buffer either until the first request or
// until there is a call to allocate a read-ahead buffer
buffer_.Alignment(alignment);
return s;
}
- // When in unbuffered mode we need to do the following changes:
+ // When in direct I/O mode we need to do the following changes:
// - use our own aligned buffer
// - always read at the offset of that is a multiple of alignment
- if (!file_base_->UseOSBuffer()) {
-
+ if (file_base_->UseDirectIO()) {
uint64_t first_page_start = 0;
size_t actual_bytes_toread = 0;
size_t bytes_requested = left;
inline
void WinRandomAccessImpl::HintImpl(RandomAccessFile::AccessPattern pattern) {
-
- if (pattern == RandomAccessFile::SEQUENTIAL &&
- !file_base_->UseOSBuffer() &&
- compaction_readahead_size_ > 0) {
+ if (pattern == RandomAccessFile::SEQUENTIAL && file_base_->UseDirectIO() &&
+ compaction_readahead_size_ > 0) {
std::lock_guard<std::mutex> lg(buffer_mut_);
if (!read_ahead_) {
read_ahead_ = true;
///////////////////////////////////////////////////////////////////////////////////////////////////
/// WinRandomAccessFile
-WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
- const EnvOptions& options) :
- WinFileData(fname, hFile, options.use_os_buffer),
- WinRandomAccessImpl(this, alignment, options) {
-}
+WinRandomAccessFile::WinRandomAccessFile(const std::string& fname, HANDLE hFile,
+ size_t alignment,
+ const EnvOptions& options)
+ : WinFileData(fname, hFile, options.use_direct_reads),
+ WinRandomAccessImpl(this, alignment, options) {}
WinRandomAccessFile::~WinRandomAccessFile() {
}
Status WinWritableImpl::AppendImpl(const Slice& data) {
// Used for buffered access ONLY
- assert(file_data_->UseOSBuffer());
+ assert(!file_data_->UseDirectIO());
assert(data.size() < std::numeric_limits<DWORD>::max());
Status s;
}
else {
assert(size_t(ret) == data.size());
- // For sequential write this would be simple
+ // For sequential write this would be simple
// size extension by data.size()
uint64_t write_end = offset + data.size();
if (write_end >= filesize_) {
// Calls flush buffers
if (fsync(file_data_->GetFileHandle()) < 0) {
auto lastError = GetLastError();
- s = IOErrorFromWindowsError("fsync failed at Sync() for: " +
- file_data_->GetName(),
- lastError);
+ s = IOErrorFromWindowsError(
+ "fsync failed at Sync() for: " + file_data_->GetName(), lastError);
}
return s;
}
////////////////////////////////////////////////////////////////////////////////
/// WinWritableFile
-WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
- size_t /* capacity */, const EnvOptions& options)
- : WinFileData(fname, hFile, options.use_os_buffer),
- WinWritableImpl(this, alignment) {
-
+WinWritableFile::WinWritableFile(const std::string& fname, HANDLE hFile,
+ size_t alignment, size_t /* capacity */,
+ const EnvOptions& options)
+ : WinFileData(fname, hFile, options.use_direct_writes),
+ WinWritableImpl(this, alignment) {
assert(!options.use_mmap_writes);
}
WinWritableFile::~WinWritableFile() {
}
- // Indicates if the class makes use of unbuffered I/O
-bool WinWritableFile::UseOSBuffer() const {
- return WinFileData::UseOSBuffer();
-}
+// Indicates if the class makes use of direct I/O
+bool WinWritableFile::UseDirectIO() const { return WinFileData::UseDirectIO(); }
size_t WinWritableFile::GetRequiredBufferAlignment() const {
return GetAlignement();
return SyncImpl();
}
-Status WinWritableFile::Fsync() {
- return SyncImpl();
-}
+Status WinWritableFile::Fsync() { return SyncImpl(); }
uint64_t WinWritableFile::GetFileSize() {
return GetFileSizeImpl();
/////////////////////////////////////////////////////////////////////////
/// WinRandomRWFile
-WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment,
- const EnvOptions& options) :
- WinFileData(fname, hFile, options.use_os_buffer),
- WinRandomAccessImpl(this, alignment, options),
- WinWritableImpl(this, alignment) {
-
-}
+WinRandomRWFile::WinRandomRWFile(const std::string& fname, HANDLE hFile,
+ size_t alignment, const EnvOptions& options)
+ : WinFileData(fname, hFile,
+ options.use_direct_reads && options.use_direct_writes),
+ WinRandomAccessImpl(this, alignment, options),
+ WinWritableImpl(this, alignment) {}
-bool WinRandomRWFile::UseOSBuffer() const {
- return WinFileData::UseOSBuffer();
-}
+bool WinRandomRWFile::UseDirectIO() const { return WinFileData::UseDirectIO(); }
size_t WinRandomRWFile::GetRequiredBufferAlignment() const {
return GetAlignement();
return PositionedAppendImpl(data, offset);
}
-Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice * result,
- char * scratch) const {
+Status WinRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const {
return ReadImpl(offset, n, result, scratch);
}
}
}
-
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
-#include <rocksdb/Status.h>
-#include <rocksdb/env.h>
+#include <stdint.h>
+#include <mutex>
+#include <string>
+#include "rocksdb/Status.h"
+#include "rocksdb/env.h"
#include "util/aligned_buffer.h"
-#include <string>
-#include <stdint.h>
-
#include <Windows.h>
-#include <mutex>
namespace rocksdb {
namespace port {
std::string GetWindowsErrSz(DWORD err);
inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) {
- return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) ?
- Status::NoSpace(context, GetWindowsErrSz(err)) :
- Status::IOError(context, GetWindowsErrSz(err));
+ return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL))
+ ? Status::NoSpace(context, GetWindowsErrSz(err))
+ : Status::IOError(context, GetWindowsErrSz(err));
}
inline Status IOErrorFromLastWindowsError(const std::string& context) {
}
inline Status IOError(const std::string& context, int err_number) {
- return (err_number == ENOSPC) ?
- Status::NoSpace(context, strerror(err_number)) :
- Status::IOError(context, strerror(err_number));
+ return (err_number == ENOSPC)
+ ? Status::NoSpace(context, strerror(err_number))
+ : Status::IOError(context, strerror(err_number));
}
// Note the below two do not set errno because they are used only here in this
return 0;
}
-SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes,
- uint64_t offset);
+SSIZE_T pwrite(HANDLE hFile, const char* src, size_t numBytes, uint64_t offset);
SSIZE_T pread(HANDLE hFile, char* src, size_t numBytes, uint64_t offset);
-Status fallocate(const std::string& filename, HANDLE hFile,
- uint64_t to_size);
-
-Status ftruncate(const std::string& filename, HANDLE hFile,
- uint64_t toSize);
+Status fallocate(const std::string& filename, HANDLE hFile, uint64_t to_size);
+Status ftruncate(const std::string& filename, HANDLE hFile, uint64_t toSize);
size_t GetUniqueIdFromFile(HANDLE hFile, char* id, size_t max_size);
class WinFileData {
-protected:
-
+ protected:
const std::string filename_;
HANDLE hFile_;
- // There is no equivalent of advising away buffered pages as in posix.
- // To implement this flag we would need to do unbuffered reads which
+ // If ture, the I/O issued would be direct I/O which the buffer
// will need to be aligned (not sure there is a guarantee that the buffer
// passed in is aligned).
- // Hence we currently ignore this flag. It is used only in a few cases
- // which should not be perf critical.
- // If perf evaluation finds this to be a problem, we can look into
- // implementing this.
- const bool use_os_buffer_;
-
-public:
+ const bool use_direct_io_;
+ public:
// We want this class be usable both for inheritance (prive
// or protected) and for containment so __ctor and __dtor public
- WinFileData(const std::string& filename, HANDLE hFile, bool use_os_buffer) :
- filename_(filename), hFile_(hFile), use_os_buffer_(use_os_buffer)
- {}
+ WinFileData(const std::string& filename, HANDLE hFile, bool use_direct_io)
+ : filename_(filename), hFile_(hFile), use_direct_io_(use_direct_io) {}
- virtual ~WinFileData() {
- this->CloseFile();
- }
+ virtual ~WinFileData() { this->CloseFile(); }
bool CloseFile() {
-
bool result = true;
if (hFile_ != NULL && hFile_ != INVALID_HANDLE_VALUE) {
HANDLE GetFileHandle() const { return hFile_; }
- bool UseOSBuffer() const { return use_os_buffer_; }
+ bool UseDirectIO() const { return use_direct_io_; }
WinFileData(const WinFileData&) = delete;
WinFileData& operator=(const WinFileData&) = delete;
};
-
// mmap() based random-access
class WinMmapReadableFile : private WinFileData, public RandomAccessFile {
HANDLE hMap_;
const void* mapped_region_;
const size_t length_;
-public:
+ public:
// mapped_region_[0,length-1] contains the mmapped contents of the file.
WinMmapReadableFile(const std::string& fileName, HANDLE hFile, HANDLE hMap,
- const void* mapped_region, size_t length);
+ const void* mapped_region, size_t length);
~WinMmapReadableFile();
WinMmapReadableFile& operator=(const WinMmapReadableFile&) = delete;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
- char* scratch) const override;
+ char* scratch) const override;
virtual Status InvalidateCache(size_t offset, size_t length) override;
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class WinMmapFile : private WinFileData, public WritableFile {
-private:
+ private:
HANDLE hMap_;
const size_t page_size_; // We flush the mapping view in page_size
// increments. We may decide if this is a memory
// page size or SSD page size
const size_t
- allocation_granularity_; // View must start at such a granularity
+ allocation_granularity_; // View must start at such a granularity
- size_t reserved_size_; // Preallocated size
+ size_t reserved_size_; // Preallocated size
- size_t mapping_size_; // The max size of the mapping object
+ size_t mapping_size_; // The max size of the mapping object
// we want to guess the final file size to minimize the remapping
- size_t view_size_; // How much memory to map into a view at a time
+ size_t view_size_; // How much memory to map into a view at a time
char* mapped_begin_; // Must begin at the file offset that is aligned with
// allocation_granularity_
virtual Status PreallocateInternal(uint64_t spaceToReserve);
-public:
-
+ public:
WinMmapFile(const std::string& fname, HANDLE hFile, size_t page_size,
- size_t allocation_granularity, const EnvOptions& options);
+ size_t allocation_granularity, const EnvOptions& options);
~WinMmapFile();
};
class WinSequentialFile : private WinFileData, public SequentialFile {
-public:
+ public:
WinSequentialFile(const std::string& fname, HANDLE f,
- const EnvOptions& options);
+ const EnvOptions& options);
~WinSequentialFile();
};
class WinRandomAccessImpl {
-protected:
-
+ protected:
WinFileData* file_base_;
- bool read_ahead_;
+ bool read_ahead_;
const size_t compaction_readahead_size_;
const size_t random_access_max_buffer_size_;
- mutable std::mutex buffer_mut_;
+ mutable std::mutex buffer_mut_;
mutable AlignedBuffer buffer_;
mutable uint64_t
- buffered_start_; // file offset set that is currently buffered
+ buffered_start_; // file offset set that is currently buffered
// Override for behavior change when creating a custom env
virtual SSIZE_T PositionedReadInternal(char* src, size_t numBytes,
- uint64_t offset) const;
-
- /*
- * The function reads a requested amount of bytes into the specified aligned
- * buffer Upon success the function sets the length of the buffer to the
- * amount of bytes actually read even though it might be less than actually
- * requested. It then copies the amount of bytes requested by the user (left)
- * to the user supplied buffer (dest) and reduces left by the amount of bytes
- * copied to the user buffer
- *
- * @user_offset [in] - offset on disk where the read was requested by the user
- * @first_page_start [in] - actual page aligned disk offset that we want to
- * read from
- * @bytes_to_read [in] - total amount of bytes that will be read from disk
- * which is generally greater or equal to the amount
- * that the user has requested due to the
- * either alignment requirements or read_ahead in
- * effect.
- * @left [in/out] total amount of bytes that needs to be copied to the user
- * buffer. It is reduced by the amount of bytes that actually
- * copied
- * @buffer - buffer to use
- * @dest - user supplied buffer
- */
+ uint64_t offset) const;
+
+ /*
+ * The function reads a requested amount of bytes into the specified aligned
+ * buffer Upon success the function sets the length of the buffer to the
+ * amount of bytes actually read even though it might be less than actually
+ * requested. It then copies the amount of bytes requested by the user (left)
+ * to the user supplied buffer (dest) and reduces left by the amount of bytes
+ * copied to the user buffer
+ *
+ * @user_offset [in] - offset on disk where the read was requested by the user
+ * @first_page_start [in] - actual page aligned disk offset that we want to
+ * read from
+ * @bytes_to_read [in] - total amount of bytes that will be read from disk
+ * which is generally greater or equal to the amount
+ * that the user has requested due to the
+ * either alignment requirements or read_ahead in
+ * effect.
+ * @left [in/out] total amount of bytes that needs to be copied to the user
+ * buffer. It is reduced by the amount of bytes that actually
+ * copied
+ * @buffer - buffer to use
+ * @dest - user supplied buffer
+ */
SSIZE_T ReadIntoBuffer(uint64_t user_offset, uint64_t first_page_start,
- size_t bytes_to_read, size_t& left,
- AlignedBuffer& buffer, char* dest) const;
+ size_t bytes_to_read, size_t& left,
+ AlignedBuffer& buffer, char* dest) const;
SSIZE_T ReadIntoOneShotBuffer(uint64_t user_offset, uint64_t first_page_start,
- size_t bytes_to_read, size_t& left,
- char* dest) const;
+ size_t bytes_to_read, size_t& left,
+ char* dest) const;
SSIZE_T ReadIntoInstanceBuffer(uint64_t user_offset,
- uint64_t first_page_start,
- size_t bytes_to_read, size_t& left,
- char* dest) const;
+ uint64_t first_page_start,
+ size_t bytes_to_read, size_t& left,
+ char* dest) const;
WinRandomAccessImpl(WinFileData* file_base, size_t alignment,
- const EnvOptions& options);
+ const EnvOptions& options);
virtual ~WinRandomAccessImpl() {}
-public:
-
+ public:
WinRandomAccessImpl(const WinRandomAccessImpl&) = delete;
WinRandomAccessImpl& operator=(const WinRandomAccessImpl&) = delete;
-
Status ReadImpl(uint64_t offset, size_t n, Slice* result,
- char* scratch) const;
+ char* scratch) const;
void HintImpl(RandomAccessFile::AccessPattern pattern);
};
// pread() based random-access
-class WinRandomAccessFile : private WinFileData,
- protected WinRandomAccessImpl, // Want to be able to override PositionedReadInternal
- public RandomAccessFile {
-
-public:
+class WinRandomAccessFile
+ : private WinFileData,
+ protected WinRandomAccessImpl, // Want to be able to override
+ // PositionedReadInternal
+ public RandomAccessFile {
+ public:
WinRandomAccessFile(const std::string& fname, HANDLE hFile, size_t alignment,
- const EnvOptions& options);
+ const EnvOptions& options);
~WinRandomAccessFile();
virtual void EnableReadAhead() override;
virtual Status Read(uint64_t offset, size_t n, Slice* result,
- char* scratch) const override;
+ char* scratch) const override;
virtual bool ShouldForwardRawRequest() const override;
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
-
// This is a sequential write class. It has been mimicked (as others) after
// the original Posix class. We add support for unbuffered I/O on windows as
// well
// No padding is required for
// buffered access.
class WinWritableImpl {
-protected:
-
- WinFileData* file_data_;
- const uint64_t alignment_;
- uint64_t filesize_; // How much data is actually written disk
- uint64_t reservedsize_; // how far we have reserved space
+ protected:
+ WinFileData* file_data_;
+ const uint64_t alignment_;
+ uint64_t filesize_; // How much data is actually written disk
+ uint64_t reservedsize_; // how far we have reserved space
virtual Status PreallocateInternal(uint64_t spaceToReserve);
Status AppendImpl(const Slice& data);
- // Requires that the data is aligned as specified by GetRequiredBufferAlignment()
+ // Requires that the data is aligned as specified by
+ // GetRequiredBufferAlignment()
Status PositionedAppendImpl(const Slice& data, uint64_t offset);
Status TruncateImpl(uint64_t size);
uint64_t GetFileSizeImpl() {
// Double accounting now here with WritableFileWriter
// and this size will be wrong when unbuffered access is used
- // but tests implement their own writable files and do not use WritableFileWrapper
+ // but tests implement their own writable files and do not use
+ // WritableFileWrapper
// so we need to squeeze a square peg through
// a round hole here.
return filesize_;
Status AllocateImpl(uint64_t offset, uint64_t len);
-public:
-
+ public:
WinWritableImpl(const WinWritableImpl&) = delete;
WinWritableImpl& operator=(const WinWritableImpl&) = delete;
};
-
class WinWritableFile : private WinFileData,
- protected WinWritableImpl,
- public WritableFile {
-
-public:
+ protected WinWritableImpl,
+ public WritableFile {
+ public:
WinWritableFile(const std::string& fname, HANDLE hFile, size_t alignment,
- size_t capacity, const EnvOptions& options);
+ size_t capacity, const EnvOptions& options);
~WinWritableFile();
- // Indicates if the class makes use of unbuffered I/O
+ // Indicates if the class makes use of direct I/O
// Use PositionedAppend
- virtual bool UseOSBuffer() const override;
+ virtual bool UseDirectIO() const override;
virtual size_t GetRequiredBufferAlignment() const override;
virtual Status Append(const Slice& data) override;
- // Requires that the data is aligned as specified by GetRequiredBufferAlignment()
+ // Requires that the data is aligned as specified by
+ // GetRequiredBufferAlignment()
virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
// Need to implement this so the file is truncated correctly
virtual size_t GetUniqueId(char* id, size_t max_size) const override;
};
-
class WinRandomRWFile : private WinFileData,
- protected WinRandomAccessImpl,
- protected WinWritableImpl,
- public RandomRWFile {
-
-public:
-
+ protected WinRandomAccessImpl,
+ protected WinWritableImpl,
+ public RandomRWFile {
+ public:
WinRandomRWFile(const std::string& fname, HANDLE hFile, size_t alignment,
- const EnvOptions& options);
+ const EnvOptions& options);
~WinRandomRWFile() {}
- // Indicates if the class makes use of unbuffered I/O
+ // Indicates if the class makes use of direct I/O
// If false you must pass aligned buffer to Write()
- virtual bool UseOSBuffer() const override;
+ virtual bool UseDirectIO() const override;
- // Use the returned alignment value to allocate
- // aligned buffer for Write() when UseOSBuffer()
- // returns false
+ // Use the returned alignment value to allocate aligned
+ // buffer for Write() when UseDirectIO() returns true
virtual size_t GetRequiredBufferAlignment() const override;
// Used by the file_reader_writer to decide if the ReadAhead wrapper
- // should simply forward the call and do not enact read_ahead buffering or locking.
+ // should simply forward the call and do not enact read_ahead buffering or
+ // locking.
// The implementation below takes care of reading ahead
virtual bool ShouldForwardRawRequest() const override;
virtual void EnableReadAhead() override;
// Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
- // Pass aligned buffer when UseOSBuffer() returns false.
+ // Pass aligned buffer when UseDirectIO() returns true.
virtual Status Write(uint64_t offset, const Slice& data) override;
// Read up to `n` bytes starting from offset `offset` and store them in
// result, provided `scratch` size should be at least `n`.
// Returns Status::OK() on success.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
- char* scratch) const override;
+ char* scratch) const override;
virtual Status Flush() override;
virtual Status Close() override;
};
-
class WinDirectory : public Directory {
-public:
+ public:
WinDirectory() {}
virtual Status Fsync() override;
};
class WinFileLock : public FileLock {
-public:
+ public:
explicit WinFileLock(HANDLE hFile) : hFile_(hFile) {
assert(hFile != NULL);
assert(hFile != INVALID_HANDLE_VALUE);
~WinFileLock();
-private:
+ private:
HANDLE hFile_;
};
-
}
}
" in MB.");
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
-DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
- "Allow buffered io using OS buffers");
-
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
"Allow reads to occur via mmap-ing files");
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
"Use O_DIRECT for reading data");
+DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes,
+ "Use O_DIRECT for writing data");
+
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
"Advise random access on table file open");
options.allow_mmap_reads = FLAGS_mmap_read;
options.allow_mmap_writes = FLAGS_mmap_write;
options.use_direct_reads = FLAGS_use_direct_reads;
+ options.use_direct_writes = FLAGS_use_direct_writes;
if (FLAGS_prefix_size != 0) {
options.prefix_extractor.reset(
NewFixedPrefixTransform(FLAGS_prefix_size));
options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
// fill storage options
- options.allow_os_buffer = FLAGS_bufferedio;
- options.allow_mmap_reads = FLAGS_mmap_read;
- options.allow_mmap_writes = FLAGS_mmap_write;
options.advise_random_on_open = FLAGS_advise_random_on_open;
options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
max_background_flushes=1
create_if_missing=true
error_if_exists=false
- allow_os_buffer=true
delayed_write_rate=1048576
manifest_preallocation_size=4194304
+ allow_mmap_reads=false
allow_mmap_writes=false
+ use_direct_reads=false
+ use_direct_writes=false
stats_dump_period_sec=600
allow_fallocate=true
- allow_mmap_reads=false
max_log_file_size=83886080
random_access_max_buffer_size=1048576
advise_random_on_open=true
}
// This class is to manage an aligned user
-// allocated buffer for unbuffered I/O purposes
+// allocated buffer for direct I/O purposes
// though can be used for any purpose.
class AlignedBuffer {
size_t alignment_;
wal_ttl_seconds(options.WAL_ttl_seconds),
wal_size_limit_mb(options.WAL_size_limit_MB),
manifest_preallocation_size(options.manifest_preallocation_size),
- allow_os_buffer(options.allow_os_buffer),
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
use_direct_reads(options.use_direct_reads),
+ use_direct_writes(options.use_direct_writes),
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
stats_dump_period_sec(options.stats_dump_period_sec),
Header(log,
" Options.recycle_log_file_num: %" ROCKSDB_PRIszt,
recycle_log_file_num);
- Header(log, " Options.allow_os_buffer: %d",
- allow_os_buffer);
- Header(log, " Options.allow_mmap_reads: %d",
- allow_mmap_reads);
Header(log, " Options.allow_fallocate: %d",
allow_fallocate);
+ Header(log, " Options.allow_mmap_reads: %d",
+ allow_mmap_reads);
Header(log, " Options.allow_mmap_writes: %d",
allow_mmap_writes);
Header(log, " Options.use_direct_reads: %d",
use_direct_reads);
+ Header(log, " Options.use_direct_writes: %d",
+ use_direct_writes);
Header(log, " Options.create_missing_column_families: %d",
create_missing_column_families);
Header(log, " Options.db_log_dir: %s",
Header(log,
" Options.manifest_preallocation_size: %" ROCKSDB_PRIszt,
manifest_preallocation_size);
- Header(log, " Options.allow_os_buffer: %d",
- allow_os_buffer);
- Header(log, " Options.allow_mmap_reads: %d",
- allow_mmap_reads);
- Header(log, " Options.allow_mmap_writes: %d",
- allow_mmap_writes);
Header(log, " Options.is_fd_close_on_exec: %d",
is_fd_close_on_exec);
Header(log, " Options.stats_dump_period_sec: %u",
uint64_t wal_ttl_seconds;
uint64_t wal_size_limit_mb;
size_t manifest_preallocation_size;
- bool allow_os_buffer;
bool allow_mmap_reads;
bool allow_mmap_writes;
bool use_direct_reads;
+ bool use_direct_writes;
bool allow_fallocate;
bool is_fd_close_on_exec;
unsigned int stats_dump_period_sec;
namespace { // anonymous namespace
void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
- env_options->use_os_buffer = options.allow_os_buffer;
env_options->use_mmap_reads = options.allow_mmap_reads;
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->use_direct_reads = options.use_direct_reads;
+ env_options->use_direct_writes = options.use_direct_writes;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
#endif
#include <deque>
#include <set>
+#include <vector>
#include "port/port.h"
#include "rocksdb/slice.h"
#include "util/coding.h"
result->reset();
Status s;
int fd = -1;
+ int flags = O_CREAT | O_TRUNC;
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+ // Note: we should avoid O_APPEND here due to ta the following bug:
+ // POSIX requires that opening a file with the O_APPEND flag should
+ // have no affect on the location at which pwrite() writes data.
+ // However, on Linux, if a file is opened with O_APPEND, pwrite()
+ // appends data to the end of the file, regardless of the value of
+ // offset.
+ // More info here: https://linux.die.net/man/2/pwrite
+ flags |= O_WRONLY;
+#ifndef OS_MACOSX
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // non-direct I/O
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ fd = open(fname.c_str(), flags, 0644);
} while (fd < 0 && errno == EINTR);
+
if (fd < 0) {
s = IOError(fname, errno);
- } else {
- SetFD_CLOEXEC(fd, &options);
- if (options.use_mmap_writes) {
- if (!checkedDiskForMmap_) {
- // this will be executed once in the program's lifetime.
- // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
- if (!SupportsFastAllocate(fname)) {
- forceMmapOff = true;
- }
- checkedDiskForMmap_ = true;
+ return s;
+ }
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_mmap_writes) {
+ if (!checkedDiskForMmap_) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fname)) {
+ forceMmapOff_ = true;
}
+ checkedDiskForMmap_ = true;
}
- if (options.use_mmap_writes && !forceMmapOff) {
- result->reset(new PosixMmapFile(fname, fd, page_size_, options));
- } else if (options.use_direct_writes) {
- close(fd);
-#ifdef OS_MACOSX
- int flags = O_WRONLY | O_APPEND | O_TRUNC | O_CREAT;
-#else
- // Note: we should avoid O_APPEND here due to ta the following bug:
- // POSIX requires that opening a file with the O_APPEND flag should
- // have no affect on the location at which pwrite() writes data.
- // However, on Linux, if a file is opened with O_APPEND, pwrite()
- // appends data to the end of the file, regardless of the value of
- // offset.
- // More info here: https://linux.die.net/man/2/pwrite
- int flags = O_WRONLY | O_TRUNC | O_CREAT | O_DIRECT;
-#endif
- TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
- fd = open(fname.c_str(), flags, 0644);
- if (fd < 0) {
- s = IOError(fname, errno);
- } else {
- std::unique_ptr<PosixDirectIOWritableFile> file(
- new PosixDirectIOWritableFile(fname, fd));
- *result = std::move(file);
- s = Status::OK();
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
#ifdef OS_MACOSX
- if (fcntl(fd, F_NOCACHE, 1) == -1) {
- close(fd);
- s = IOError(fname, errno);
- }
-#endif
- }
- } else {
- // disable mmap writes
- EnvOptions no_mmap_writes_options = options;
- no_mmap_writes_options.use_mmap_writes = false;
-
- result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError(fname, errno);
+ return s;
}
+#endif
+ result->reset(new PosixWritableFile(fname, fd, options));
+ } else {
+ // disable mmap writes
+ EnvOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
}
return s;
}
result->reset();
Status s;
int fd = -1;
+
+ int flags = 0;
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+ flags |= O_WRONLY;
+#ifndef OS_MACOSX
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // mmap needs O_RDWR mode
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(old_fname.c_str(), O_RDWR, 0644);
+ fd = open(old_fname.c_str(), flags, 0644);
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
s = IOError(fname, errno);
- } else {
- SetFD_CLOEXEC(fd, &options);
- // rename into place
- if (rename(old_fname.c_str(), fname.c_str()) != 0) {
- Status r = IOError(old_fname, errno);
- close(fd);
- return r;
- }
- if (options.use_mmap_writes) {
- if (!checkedDiskForMmap_) {
- // this will be executed once in the program's lifetime.
- // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
- if (!SupportsFastAllocate(fname)) {
- forceMmapOff = true;
- }
- checkedDiskForMmap_ = true;
+ return s;
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+ // rename into place
+ if (rename(old_fname.c_str(), fname.c_str()) != 0) {
+ s = IOError(old_fname, errno);
+ close(fd);
+ return s;
+ }
+
+ if (options.use_mmap_writes) {
+ if (!checkedDiskForMmap_) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fname)) {
+ forceMmapOff_ = true;
}
+ checkedDiskForMmap_ = true;
}
- if (options.use_mmap_writes && !forceMmapOff) {
- result->reset(new PosixMmapFile(fname, fd, page_size_, options));
- } else {
- // disable mmap writes
- EnvOptions no_mmap_writes_options = options;
- no_mmap_writes_options.use_mmap_writes = false;
-
- result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError(fname, errno);
+ return s;
}
+#endif
+ result->reset(new PosixWritableFile(fname, fd, options));
+ } else {
+ // disable mmap writes
+ EnvOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
}
return s;
+
+ return s;
}
virtual Status NewRandomRWFile(const std::string& fname,
const DBOptions& db_options) const override {
EnvOptions optimized = env_options;
optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
// TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
const EnvOptions& env_options) const override {
EnvOptions optimized = env_options;
optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
optimized.fallocate_with_keep_size = true;
return optimized;
}
private:
bool checkedDiskForMmap_;
- bool forceMmapOff; // do we override Env options?
-
+ bool forceMmapOff_; // do we override Env options?
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname) {
PosixEnv::PosixEnv()
: checkedDiskForMmap_(false),
- forceMmapOff(false),
+ forceMmapOff_(false),
page_size_(getpagesize()),
thread_pools_(Priority::TOTAL) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left);
}
- // Flush only when I/O is buffered
- if (use_os_buffer_ &&
- (buf_.Capacity() - buf_.CurrentSize()) < left) {
+ // Flush only when buffered I/O
+ if (!direct_io_ && (buf_.Capacity() - buf_.CurrentSize()) < left) {
if (buf_.CurrentSize() > 0) {
s = Flush();
if (!s.ok()) {
assert(buf_.CurrentSize() == 0);
}
- // We never write directly to disk with unbuffered I/O on.
+ // We never write directly to disk with direct I/O on.
// or we simply use it for its original purpose to accumulate many small
// chunks
- if (!use_os_buffer_ || (buf_.Capacity() >= left)) {
+ if (direct_io_ || (buf_.Capacity() >= left)) {
while (left > 0) {
size_t appended = buf_.Append(src, left);
left -= appended;
// We double the buffer here because
// Flush calls do not keep up with the incoming bytes
- // This is the only place when buffer is changed with unbuffered I/O
+ // This is the only place when buffer is changed with direct I/O
if (buf_.Capacity() < max_buffer_size_) {
size_t desiredCapacity = buf_.Capacity() * 2;
desiredCapacity = std::min(desiredCapacity, max_buffer_size_);
s = Flush(); // flush cache to OS
- // In unbuffered mode we write whole pages so
+ // In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
Status interim = writable_file_->Truncate(filesize_);
if (!interim.ok() && s.ok()) {
return s;
}
-// write out the cached data to the OS cache
+// write out the cached data to the OS cache or storage if direct I/O
+// enabled
Status WritableFileWriter::Flush() {
Status s;
TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
rocksdb_kill_odds * REDUCE_ODDS2);
if (buf_.CurrentSize() > 0) {
- if (use_os_buffer_) {
- s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
+ if (direct_io_) {
+ s = WriteDirect();
} else {
- s = WriteUnbuffered();
+ s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
}
if (!s.ok()) {
return s;
if (align) {
// Here we may actually require more than burst and block
- // but we can not write less than one page at a time on unbuffered
- // thus we may want not to use ratelimiter s
+ // but we can not write less than one page at a time on direct I/O
+ // thus we may want not to use ratelimiter
size_t alignment = buf_.Alignment();
bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
}
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
- assert(use_os_buffer_);
+ assert(!direct_io_);
const char* src = data;
size_t left = size;
// whole number of pages to be written again on the next flush because we can
// only write on aligned
// offsets.
-Status WritableFileWriter::WriteUnbuffered() {
+Status WritableFileWriter::WriteDirect() {
Status s;
- assert(!use_os_buffer_);
+ assert(direct_io_);
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);
{
IOSTATS_TIMER_GUARD(write_nanos);
TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
- // Unbuffered writes must be positional
+ // direct writes must be positional
s = writable_file_->PositionedAppend(Slice(src, size), write_offset);
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
uint64_t next_write_offset_;
bool pending_sync_;
const bool direct_io_;
- const bool use_os_buffer_;
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
next_write_offset_(0),
pending_sync_(false),
direct_io_(writable_file_->UseDirectIO()),
- use_os_buffer_(writable_file_->UseOSBuffer()),
last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter) {
private:
// Used when os buffering is OFF and we are writing
- // DMA such as in Windows unbuffered mode
- Status WriteUnbuffered();
+ // DMA such as in Direct I/O mode
+ Status WriteDirect();
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
TEST_F(WritableFileWriterTest, AppendStatusReturn) {
class FakeWF : public WritableFile {
public:
- explicit FakeWF() : use_os_buffer_(true), io_error_(false) {}
+ explicit FakeWF() : use_direct_io_(false), io_error_(false) {}
- virtual bool UseOSBuffer() const override { return use_os_buffer_; }
+ virtual bool UseDirectIO() const override { return use_direct_io_; }
Status Append(const Slice& data) override {
if (io_error_) {
return Status::IOError("Fake IO error");
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
- void SetUseOSBuffer(bool val) { use_os_buffer_ = val; }
+ void SetUseDirectIO(bool val) { use_direct_io_ = val; }
void SetIOError(bool val) { io_error_ = val; }
protected:
- bool use_os_buffer_;
+ bool use_direct_io_;
bool io_error_;
};
unique_ptr<FakeWF> wf(new FakeWF());
- wf->SetUseOSBuffer(false);
+ wf->SetUseDirectIO(true);
unique_ptr<WritableFileWriter> writer(
new WritableFileWriter(std::move(wf), EnvOptions()));
: filename_(fname),
file_(f),
fd_(fileno(f)),
- use_os_buffer_(options.use_os_buffer) {}
+ use_direct_io_(options.use_direct_reads) {}
PosixSequentialFile::~PosixSequentialFile() { fclose(file_); }
s = IOError(filename_, errno);
}
}
- if (!use_os_buffer_) {
+ if (use_direct_io_) {
// we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached.
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
*/
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
const EnvOptions& options)
- : filename_(fname), fd_(fd), use_os_buffer_(options.use_os_buffer) {
+ : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads) {
assert(!options.use_mmap_reads || sizeof(void*) < 8);
}
// An error: return a non-ok status
s = IOError(filename_, errno);
}
- if (!use_os_buffer_) {
+
+ if (use_direct_io_) {
// we need to fadvise away the entire range of pages because
// we do not want readahead pages to be cached.
Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); // free OS pages
: fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
fd_ = fd_ + 0; // suppress the warning for used variables
assert(options.use_mmap_reads);
- assert(options.use_os_buffer);
+ assert(!options.use_direct_reads);
}
PosixMmapReadableFile::~PosixMmapReadableFile() {
#endif
assert((page_size & (page_size - 1)) == 0);
assert(options.use_mmap_writes);
+ assert(!options.use_direct_writes);
}
PosixMmapFile::~PosixMmapFile() {
*/
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
const EnvOptions& options)
- : filename_(fname), fd_(fd), filesize_(0) {
+ : filename_(fname),
+ direct_io_(options.use_direct_writes),
+ fd_(fd),
+ filesize_(0) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
allow_fallocate_ = options.allow_fallocate;
fallocate_with_keep_size_ = options.fallocate_with_keep_size;
}
Status PosixWritableFile::Append(const Slice& data) {
+ assert(!direct_io_|| (IsSectorAligned(data.size()) && IsPageAligned(data.data())));
const char* src = data.data();
size_t left = data.size();
while (left != 0) {
}
Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
+ assert(direct_io_ && IsSectorAligned(offset) &&
+ IsSectorAligned(data.size()) && IsPageAligned(data.data()));
assert(offset <= std::numeric_limits<off_t>::max());
const char* src = data.data();
size_t left = data.size();
offset += done;
src += done;
}
- filesize_ = offset + data.size();
+ filesize_ = offset;
return Status::OK();
}
uint64_t PosixWritableFile::GetFileSize() { return filesize_; }
Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
+ if (direct_io_) {
+ return Status::OK();
+ }
#ifndef OS_LINUX
return Status::OK();
#else
}
#endif
-/*
- * PosixDirectIOWritableFile
- */
-Status PosixDirectIOWritableFile::Append(const Slice& data) {
- assert(IsSectorAligned(data.size()) && IsPageAligned(data.data()));
- if (!IsSectorAligned(data.size()) || !IsPageAligned(data.data())) {
- return Status::IOError("Unaligned buffer for direct IO");
- }
- return PosixWritableFile::Append(data);
-}
-
-Status PosixDirectIOWritableFile::PositionedAppend(const Slice& data,
- uint64_t offset) {
- assert(IsSectorAligned(offset));
- assert(IsSectorAligned(data.size()));
- assert(IsPageAligned(data.data()));
- if (!IsSectorAligned(offset) || !IsSectorAligned(data.size()) ||
- !IsPageAligned(data.data())) {
- return Status::IOError("offset or size is not aligned");
- }
- return PosixWritableFile::PositionedAppend(data, offset);
-}
-
/*
* PosixRandomRWFile
*/
std::string filename_;
FILE* file_;
int fd_;
- bool use_os_buffer_;
+ bool use_direct_io_;
public:
PosixSequentialFile(const std::string& fname, FILE* f,
protected:
std::string filename_;
int fd_;
- bool use_os_buffer_;
+ bool use_direct_io_;
public:
PosixRandomAccessFile(const std::string& fname, int fd,
class PosixWritableFile : public WritableFile {
protected:
const std::string filename_;
+ const bool direct_io_;
int fd_;
uint64_t filesize_;
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Sync() override;
virtual Status Fsync() override;
virtual bool IsSyncThreadSafe() const override;
+ virtual bool UseDirectIO() const override { return direct_io_; }
virtual uint64_t GetFileSize() override;
+ virtual size_t GetRequiredBufferAlignment() const override {
+ // TODO(gzh): It should be the logical sector size/filesystem block size
+ // hardcoded as 4k for most cases
+ return 4 * 1024;
+ }
virtual Status InvalidateCache(size_t offset, size_t length) override;
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(uint64_t offset, uint64_t len) override;
#endif
};
-class PosixDirectIOWritableFile : public PosixWritableFile {
- public:
- explicit PosixDirectIOWritableFile(const std::string& filename, int fd)
- : PosixWritableFile(filename, fd, EnvOptions()) {}
- virtual ~PosixDirectIOWritableFile() {}
-
- bool UseOSBuffer() const override { return false; }
- size_t GetRequiredBufferAlignment() const override { return 4 * 1024; }
- Status Append(const Slice& data) override;
- Status PositionedAppend(const Slice& data, uint64_t offset) override;
- bool UseDirectIO() const override { return true; }
- Status InvalidateCache(size_t offset, size_t length) override {
- return Status::OK();
- }
-};
-
+// mmap() based random-access
class PosixMmapReadableFile : public RandomAccessFile {
private:
int fd_;
void RunBenchmark() {
std::string file_name = test::TmpDir() + "/log_write_benchmark.log";
Env* env = Env::Default();
- EnvOptions env_options;
- env_options.use_mmap_writes = false;
+ EnvOptions env_options = env->OptimizeForLogWrite(EnvOptions());
env_options.bytes_per_sync = FLAGS_bytes_per_sync;
unique_ptr<WritableFile> file;
env->NewWritableFile(file_name, &file, env_options);
class WritableFileImpl : public WritableFile {
public:
- WritableFileImpl(FileState* file) : file_(file) {
- file_->Ref();
- }
+ explicit WritableFileImpl(FileState* file) : file_(file) { file_->Ref(); }
~WritableFileImpl() {
file_->Unref();
}
Status MockEnv::NewWritableFile(const std::string& fname,
- unique_ptr<WritableFile>* result,
- const EnvOptions& env_options) {
+ unique_ptr<WritableFile>* result,
+ const EnvOptions& env_options) {
auto fn = NormalizePath(fname);
MutexLock lock(&mutex_);
if (file_map_.find(fn) != file_map_.end()) {
WAL_ttl_seconds(0),
WAL_size_limit_MB(0),
manifest_preallocation_size(4 * 1024 * 1024),
- allow_os_buffer(true),
allow_mmap_reads(false),
allow_mmap_writes(false),
use_direct_reads(false),
+ use_direct_writes(false),
allow_fallocate(true),
is_fd_close_on_exec(true),
skip_log_error_on_recovery(false),
WAL_ttl_seconds(options.WAL_ttl_seconds),
WAL_size_limit_MB(options.WAL_size_limit_MB),
manifest_preallocation_size(options.manifest_preallocation_size),
- allow_os_buffer(options.allow_os_buffer),
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
use_direct_reads(options.use_direct_reads),
+ use_direct_writes(options.use_direct_writes),
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
skip_log_error_on_recovery(options.skip_log_error_on_recovery),
keep_log_file_num);
Header(log, " Options.recycle_log_file_num: %" ROCKSDB_PRIszt,
recycle_log_file_num);
- Header(log, " Options.allow_os_buffer: %d", allow_os_buffer);
- Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads);
Header(log, " Options.allow_fallocate: %d", allow_fallocate);
+ Header(log, " Options.allow_mmap_reads: %d", allow_mmap_reads);
Header(log, " Options.allow_mmap_writes: %d", allow_mmap_writes);
Header(log, " Options.use_direct_reads: %d", use_direct_reads);
+ Header(log, " Options.use_direct_writes: %d", use_direct_writes);
Header(log, " Options.create_missing_column_families: %d",
create_missing_column_families);
Header(log, " Options.db_log_dir: %s",
Header(log,
" Options.manifest_preallocation_size: %" ROCKSDB_PRIszt,
manifest_preallocation_size);
- Header(log, " Options.allow_os_buffer: %d",
- allow_os_buffer);
- Header(log, " Options.allow_mmap_reads: %d",
- allow_mmap_reads);
- Header(log, " Options.allow_mmap_writes: %d",
- allow_mmap_writes);
Header(log, " Options.is_fd_close_on_exec: %d",
is_fd_close_on_exec);
Header(log, " Options.stats_dump_period_sec: %u",
options.WAL_size_limit_MB = immutable_db_options.wal_size_limit_mb;
options.manifest_preallocation_size =
immutable_db_options.manifest_preallocation_size;
- options.allow_os_buffer = immutable_db_options.allow_os_buffer;
options.allow_mmap_reads = immutable_db_options.allow_mmap_reads;
options.allow_mmap_writes = immutable_db_options.allow_mmap_writes;
options.use_direct_reads = immutable_db_options.use_direct_reads;
+ options.use_direct_writes = immutable_db_options.use_direct_writes;
options.allow_fallocate = immutable_db_options.allow_fallocate;
options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec;
options.stats_dump_period_sec = immutable_db_options.stats_dump_period_sec;
{"use_direct_reads",
{offsetof(struct DBOptions, use_direct_reads), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
+ {"use_direct_writes",
+ {offsetof(struct DBOptions, use_direct_writes), OptionType::kBoolean,
+ OptionVerificationType::kNormal, false, 0}},
{"allow_2pc",
{offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
{"allow_os_buffer",
- {offsetof(struct DBOptions, allow_os_buffer), OptionType::kBoolean,
- OptionVerificationType::kNormal, false, 0}},
+ {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, true, 0}},
{"create_if_missing",
{offsetof(struct DBOptions, create_if_missing), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
"max_background_flushes=35;"
"create_if_missing=false;"
"error_if_exists=true;"
- "allow_os_buffer=false;"
"delayed_write_rate=4294976214;"
"manifest_preallocation_size=1222;"
"allow_mmap_writes=false;"
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"use_direct_reads=false;"
+ "use_direct_writes=false;"
"max_log_file_size=4607;"
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"
{"WAL_ttl_seconds", "43"},
{"WAL_size_limit_MB", "44"},
{"manifest_preallocation_size", "45"},
- {"allow_os_buffer", "false"},
{"allow_mmap_reads", "true"},
{"allow_mmap_writes", "false"},
+ {"use_direct_reads", "false"},
+ {"use_direct_writes", "false"},
{"is_fd_close_on_exec", "true"},
{"skip_log_error_on_recovery", "false"},
{"stats_dump_period_sec", "46"},
ASSERT_EQ(new_db_opt.WAL_ttl_seconds, static_cast<uint64_t>(43));
ASSERT_EQ(new_db_opt.WAL_size_limit_MB, static_cast<uint64_t>(44));
ASSERT_EQ(new_db_opt.manifest_preallocation_size, 45U);
- ASSERT_EQ(new_db_opt.allow_os_buffer, false);
ASSERT_EQ(new_db_opt.allow_mmap_reads, true);
ASSERT_EQ(new_db_opt.allow_mmap_writes, false);
+ ASSERT_EQ(new_db_opt.use_direct_reads, false);
+ ASSERT_EQ(new_db_opt.use_direct_writes, false);
ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true);
ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false);
ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U);
db_opt->advise_random_on_open = rnd->Uniform(2);
db_opt->allow_mmap_reads = rnd->Uniform(2);
db_opt->allow_mmap_writes = rnd->Uniform(2);
- db_opt->allow_os_buffer = rnd->Uniform(2);
+ db_opt->use_direct_reads = rnd->Uniform(2);
+ db_opt->use_direct_writes = rnd->Uniform(2);
db_opt->create_if_missing = rnd->Uniform(2);
db_opt->create_missing_column_families = rnd->Uniform(2);
db_opt->disableDataSync = rnd->Uniform(2);
unique_ptr<SequentialFile> src_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
- env_options.use_os_buffer = false;
+ // TODO:(gzh) maybe use direct writes here if possible
if (size != nullptr) {
*size = 0;
}
EnvOptions env_options;
env_options.use_mmap_writes = false;
- env_options.use_os_buffer = false;
+ env_options.use_direct_reads = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
unique_ptr<WritableFile> backup_meta_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
+ env_options.use_direct_writes = false;
s = env_->NewWritableFile(meta_filename_ + ".tmp", &backup_meta_file,
env_options);
if (!s.ok()) {
public:
unique_ptr<SequentialFile> a_, b_;
std::string fname;
- SequentialFileMirror(std::string f) : fname(f) {}
+ explicit SequentialFileMirror(std::string f) : fname(f) {}
Status Read(size_t n, Slice* result, char* scratch) {
Slice aslice;
public:
unique_ptr<RandomAccessFile> a_, b_;
std::string fname;
- RandomAccessFileMirror(std::string f) : fname(f) {}
+ explicit RandomAccessFileMirror(std::string f) : fname(f) {}
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
Status as = a_->Read(offset, n, result, scratch);
public:
unique_ptr<WritableFile> a_, b_;
std::string fname;
- WritableFileMirror(std::string f) : fname(f) {}
+ explicit WritableFileMirror(std::string f) : fname(f) {}
Status Append(const Slice& data) override {
Status as = a_->Append(data);