id_, name_.c_str());
}
}
+
+ if (data_dirs_.size()) { // Explicitly close data directories
+ Status s = Status::OK();
+ for (auto& data_dir_ptr : data_dirs_) {
+ if (data_dir_ptr) {
+ s = data_dir_ptr->Close(IOOptions(), nullptr);
+ if (!s.ok()) {
+ ROCKS_LOG_WARN(ioptions_.logger, "Ignoring error %s",
+ s.ToString().c_str());
+ }
+ }
+ }
+ }
}
bool ColumnFamilyData::UnrefAndTryDelete() {
#endif
#include "util/file_checksum_helper.h"
#include "util/random.h"
+#include "utilities/counted_fs.h"
#include "utilities/fault_injection_env.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
ASSERT_EQ(env->GetCloseCount(), 3);
}
+TEST_F(DBBasicTest, DBCloseAllDirectoryFDs) {
+ Options options = GetDefaultOptions();
+ std::string dbname = test::PerThreadDBPath("db_close_all_dir_fds_test");
+ // Configure a specific WAL directory
+ options.wal_dir = dbname + "_wal_dir";
+ // Configure 3 different data directories
+ options.db_paths.emplace_back(dbname + "_1", 512 * 1024);
+ options.db_paths.emplace_back(dbname + "_2", 4 * 1024 * 1024);
+ options.db_paths.emplace_back(dbname + "_3", 1024 * 1024 * 1024);
+
+ ASSERT_OK(DestroyDB(dbname, options));
+
+ DB* db = nullptr;
+ std::unique_ptr<Env> env = NewCompositeEnv(
+ std::make_shared<CountedFileSystem>(FileSystem::Default()));
+ options.create_if_missing = true;
+ options.env = env.get();
+ Status s = DB::Open(options, dbname, &db);
+ ASSERT_OK(s);
+ ASSERT_TRUE(db != nullptr);
+
+ // Explicitly close the database to ensure the open and close counter for
+ // directories are equivalent
+ s = db->Close();
+ auto* counted_fs =
+ options.env->GetFileSystem()->CheckedCast<CountedFileSystem>();
+ assert(counted_fs);
+ ASSERT_TRUE(counted_fs->counters()->dir_opens ==
+ counted_fs->counters()->dir_closes);
+ ASSERT_OK(s);
+ delete db;
+}
+
TEST_F(DBBasicTest, DBCloseFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
}
+ IOStatus io_s = directories_.Close(IOOptions(), nullptr /* dbg */);
+ if (!io_s.ok()) {
+ ret = io_s;
+ }
if (ret.IsAborted()) {
// Reserve IsAborted() error for those where users didn't release
// certain resource and they can release them and come back and
// retry. In this case, we wrap this exception to something else.
return Status::Incomplete(ret.ToString());
}
+
return ret;
}
s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
DirFsyncOptions(options_file_name));
}
+ if (s.ok()) {
+ s = dir_obj->Close(IOOptions(), nullptr);
+ }
}
if (s.ok()) {
InstrumentedMutexLock l(&mutex_);
FSDirectory* GetDbDir() { return db_dir_.get(); }
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) {
+ // close all directories for all database paths
+ IOStatus s = IOStatus::OK();
+ if (db_dir_) {
+ s = db_dir_->Close(options, dbg);
+ }
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (wal_dir_) {
+ s = wal_dir_->Close(options, dbg);
+ }
+
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (data_dirs_.size() > 0 && s.ok()) {
+ for (auto& data_dir_ptr : data_dirs_) {
+ if (data_dir_ptr) {
+ s = data_dir_ptr->Close(options, dbg);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ }
+ }
+
+ return s;
+ }
+
private:
std::unique_ptr<FSDirectory> db_dir_;
std::vector<std::unique_ptr<FSDirectory>> data_dirs_;
~NoopDirectory() {}
Status Fsync() override { return Status::OK(); }
+ Status Close() override { return Status::OK(); }
};
result->reset(new NoopDirectory());
IODebugContext dbg;
return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
}
+
+ Status Close() override {
+ IOOptions io_opts;
+ IODebugContext dbg;
+ return target_->Close(io_opts, &dbg);
+ }
+
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->Fsync());
}
+ IOStatus Close(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return status_to_io_status(target_->Close());
+ }
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
if (fd < 0) {
return IOError("While open directory", name, errno);
} else {
- result->reset(new PosixDirectory(fd));
+ result->reset(new PosixDirectory(fd, name));
}
return IOStatus::OK();
}
// The magic number for BTRFS is fixed, if it's not defined, define it here
#define BTRFS_SUPER_MAGIC 0x9123683E
#endif
-PosixDirectory::PosixDirectory(int fd) : fd_(fd) {
+PosixDirectory::PosixDirectory(int fd, const std::string& directory_name)
+ : fd_(fd), directory_name_(directory_name) {
is_btrfs_ = false;
#ifdef OS_LINUX
struct statfs buf;
#endif
}
-PosixDirectory::~PosixDirectory() { close(fd_); }
+PosixDirectory::~PosixDirectory() {
+ if (fd_ >= 0) {
+ IOStatus s = PosixDirectory::Close(IOOptions(), nullptr);
+ s.PermitUncheckedError();
+ }
+}
IOStatus PosixDirectory::Fsync(const IOOptions& opts, IODebugContext* dbg) {
return FsyncWithDirOptions(opts, dbg, DirFsyncOptions());
}
+IOStatus PosixDirectory::Close(const IOOptions& /*opts*/,
+ IODebugContext* /*dbg*/) {
+ IOStatus s = IOStatus::OK();
+ if (close(fd_) < 0) {
+ s = IOError("While closing directory ", directory_name_, errno);
+ } else {
+ fd_ = -1;
+ }
+ return s;
+}
+
IOStatus PosixDirectory::FsyncWithDirOptions(
const IOOptions& /*opts*/, IODebugContext* /*dbg*/,
const DirFsyncOptions& dir_fsync_options) {
}
// fallback to dir-fsync for kDefault, kDirRenamed and kFileDeleted
}
+
+ // skip fsync/fcntl when fd_ == -1 since this file descriptor has been closed
+ // in either the de-construction or the close function, data must have been
+ // fsync-ed before de-construction and close is called
#ifdef HAVE_FULLFSYNC
// btrfs is a Linux file system, while currently F_FULLFSYNC is available on
// Mac OS.
assert(!is_btrfs_);
- if (::fcntl(fd_, F_FULLFSYNC) < 0) {
+ if (fd_ != -1 && ::fcntl(fd_, F_FULLFSYNC) < 0) {
return IOError("while fcntl(F_FULLFSYNC)", "a directory", errno);
}
#else // HAVE_FULLFSYNC
- if (fsync(fd_) == -1) {
+ if (fd_ != -1 && fsync(fd_) == -1) {
s = IOError("While fsync", "a directory", errno);
}
#endif // HAVE_FULLFSYNC
class PosixDirectory : public FSDirectory {
public:
- explicit PosixDirectory(int fd);
+ explicit PosixDirectory(int fd, const std::string& directory_name);
~PosixDirectory();
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
+ virtual IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
+
virtual IOStatus FsyncWithDirOptions(
const IOOptions&, IODebugContext*,
const DirFsyncOptions& dir_fsync_options) override;
private:
int fd_;
bool is_btrfs_;
+ const std::string directory_name_;
};
} // namespace ROCKSDB_NAMESPACE
IODebugContext* /*dbg*/) override {
return IOStatus::OK();
}
+
+ IOStatus Close(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) override {
+ return IOStatus::OK();
+ }
};
class MockEnvFileLock : public FileLock {
s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
DirFsyncOptions(identify_file_name));
}
+ if (s.ok()) {
+ s = dir_obj->Close(IOOptions(), nullptr);
+ }
if (!s.ok()) {
env->DeleteFile(tmp).PermitUncheckedError();
}
virtual ~Directory() {}
// Fsync directory. Can be called concurrently from multiple threads.
virtual Status Fsync() = 0;
+ // Close directory.
+ virtual Status Close() = 0;
virtual size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const {
return 0;
explicit DirectoryWrapper(Directory* target) : target_(target) {}
Status Fsync() override { return target_->Fsync(); }
+ Status Close() override { return target_->Close(); }
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
return Fsync(options, dbg);
}
+ // Close directory
+ virtual IOStatus Close(const IOOptions& options, IODebugContext* dbg) = 0;
+
virtual size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const {
return 0;
}
return target_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
}
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+ return target_->Close(options, dbg);
+ }
+
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
return s;
}
- result->reset(new WinDirectory(handle));
+ result->reset(new WinDirectory(name, handle));
return s;
}
return IOStatus::OK();
}
+IOStatus WinDirectory::Close(const IOOptions& /*options*/,
+ IODebugContext* /*dbg*/) {
+ IOStatus s = IOStatus::OK();
+ BOOL ret __attribute__((__unused__));
+ if (handle_ != INVALID_HANDLE_VALUE) {
+ ret = ::CloseHandle(handle_);
+ if (!ret) {
+ auto lastError = GetLastError();
+ s = IOErrorFromWindowsError("Directory closes failed for : " + GetName(),
+ lastError);
+ }
+ handle_ = NULL;
+ }
+ return s;
+}
+
size_t WinDirectory::GetUniqueId(char* id, size_t max_size) const {
return GetUniqueIdFromFile(handle_, id, max_size);
}
};
class WinDirectory : public FSDirectory {
+ const std::string filename_;
HANDLE handle_;
public:
- explicit WinDirectory(HANDLE h) noexcept : handle_(h) {
+ explicit WinDirectory(const std::string& filename, HANDLE h) noexcept
+ : filename_(filename), handle_(h) {
assert(handle_ != INVALID_HANDLE_VALUE);
}
- ~WinDirectory() { ::CloseHandle(handle_); }
+ ~WinDirectory() {
+ if (handle_ != NULL) {
+ IOStatus s = WinDirectory::Close(IOOptions(), nullptr);
+ s.PermitUncheckedError();
+ }
+ }
+ const std::string& GetName() const { return filename_; }
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
size_t GetUniqueId(char* id, size_t max_size) const override;
};
for (auto& pair : cf_handles_) {
delete pair.second;
}
+ Status s = db_->Close();
+ s.PermitUncheckedError();
delete db_;
db_ = nullptr;
}
return rv;
}
+ IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
+ IOStatus rv = FSDirectoryWrapper::Close(options, dbg);
+ if (rv.ok()) {
+ fs_->counters()->closes++;
+ fs_->counters()->dir_closes++;
+ }
+ return rv;
+ }
+
IOStatus FsyncWithDirOptions(const IOOptions& options, IODebugContext* dbg,
const DirFsyncOptions& dir_options) override {
IOStatus rv =
ss << "Num Dir Fsync(): " << dsyncs.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num Close(): " << closes.load(std::memory_order_relaxed) << std::endl;
+ ss << "Num Dir Open(): " << dir_opens.load(std::memory_order_relaxed)
+ << std::endl;
+ ss << "Num Dir Close(): " << dir_closes.load(std::memory_order_relaxed)
+ << std::endl;
ss << "Num Read(): " << reads.ops.load(std::memory_order_relaxed)
<< std::endl;
ss << "Num Append(): " << writes.ops.load(std::memory_order_relaxed)
IOStatus s = target()->NewDirectory(name, options, &base, dbg);
if (s.ok()) {
counters_.opens++;
+ counters_.dir_opens++;
result->reset(new CountedDirectory(std::move(base), this));
}
return s;
std::atomic<int> syncs;
std::atomic<int> dsyncs;
std::atomic<int> fsyncs;
+ std::atomic<int> dir_opens;
+ std::atomic<int> dir_closes;
OpCounter reads;
OpCounter writes;
flushes(0),
syncs(0),
dsyncs(0),
- fsyncs(0) {}
+ fsyncs(0),
+ dir_opens(0),
+ dir_closes(0) {}
void Reset() {
opens = 0;
syncs = 0;
dsyncs = 0;
fsyncs = 0;
+ dir_opens = 0;
+ dir_closes = 0;
reads.Reset();
writes.Reset();
}
return dir_->Fsync();
}
+Status TestDirectory::Close() {
+ if (!env_->IsFilesystemActive()) {
+ return env_->GetError();
+ }
+ return dir_->Close();
+}
+
TestRandomAccessFile::TestRandomAccessFile(
std::unique_ptr<RandomAccessFile>&& target, FaultInjectionTestEnv* env)
: target_(std::move(target)), env_(env) {
~TestDirectory() {}
virtual Status Fsync() override;
+ virtual Status Close() override;
private:
FaultInjectionTestEnv* env_;
return s;
}
+IOStatus TestFSDirectory::Close(const IOOptions& options, IODebugContext* dbg) {
+ if (!fs_->IsFilesystemActive()) {
+ return fs_->GetError();
+ }
+ IOStatus s = dir_->Close(options, dbg);
+ return s;
+}
+
IOStatus TestFSDirectory::FsyncWithDirOptions(
const IOOptions& options, IODebugContext* dbg,
const DirFsyncOptions& dir_fsync_options) {
virtual IOStatus Fsync(const IOOptions& options,
IODebugContext* dbg) override;
+ virtual IOStatus Close(const IOOptions& options,
+ IODebugContext* dbg) override;
+
virtual IOStatus FsyncWithDirOptions(
const IOOptions& options, IODebugContext* dbg,
const DirFsyncOptions& dir_fsync_options) override;