out.append(ss);
} else if (command == "bluefs files list") {
const char* devnames[3] = {"wal","db","slow"};
- std::lock_guard l(bluefs->nodes.lock);
+ std::lock_guard l(bluefs->lock);
f->open_array_section("files");
- for (auto &d : bluefs->nodes.dir_map) {
+ for (auto &d : bluefs->dir_map) {
std::string dir = d.first;
for (auto &r : d.second->file_map) {
f->open_object_section("file");
delete logger;
}
-//AK - TODO - locking needed but not certain
void BlueFS::_update_logger_stats()
{
// we must be holding the lock
- logger->set(l_bluefs_num_files, nodes.file_map.size());
- logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size);
+ logger->set(l_bluefs_num_files, file_map.size());
+ logger->set(l_bluefs_log_bytes, log_writer->file->fnode.size);
if (alloc[BDEV_WAL]) {
logger->set(l_bluefs_wal_total_bytes, _get_total(BDEV_WAL));
uint64_t BlueFS::get_used()
{
+ std::lock_guard l(lock);
uint64_t used = 0;
for (unsigned id = 0; id < MAX_BDEV; ++id) {
used += _get_used(id);
{
ceph_assert(id < alloc.size());
ceph_assert(alloc[id]);
+ std::lock_guard l(lock);
return _get_used(id);
}
uint64_t BlueFS::get_total(unsigned id)
{
+ std::lock_guard l(lock);
return _get_total(id);
}
uint64_t BlueFS::get_free(unsigned id)
{
+ std::lock_guard l(lock);
ceph_assert(id < alloc.size());
return alloc[id]->get_free();
}
int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
{
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " bdev " << id << dendl;
ceph_assert(id < alloc.size());
- for (auto& p : nodes.file_map) {
+ for (auto& p : file_map) {
for (auto& q : p.second->fnode.extents) {
if (q.bdev == id) {
extents->insert(q.offset, q.length);
int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
{
+ std::unique_lock l(lock);
dout(1) << __func__
<< " osd_uuid " << osd_uuid
<< dendl;
&log_file->fnode);
vselector->add_usage(log_file->vselector_hint, log_file->fnode);
ceph_assert(r == 0);
- log.writer = _create_writer(log_file);
+ log_writer = _create_writer(log_file);
// initial txn
- ceph_assert(log.seq_live == 1);
- log.t.seq = 1;
- log.t.op_init();
- _flush_and_sync_log_LD();
+ log_t.op_init();
+ _flush_and_sync_log(l);
// write supers
super.log_fnode = log_file->fnode;
super.memorized_layout = layout;
_write_super(BDEV_DB);
- _flush_bdev();
+ flush_bdev();
// clean up
super = bluefs_super_t();
- _close_writer(log.writer);
- log.writer = NULL;
+ _close_writer(log_writer);
+ log_writer = NULL;
vselector.reset(nullptr);
_stop_alloc();
_shutdown_logger();
}
// init freelist
- for (auto& p : nodes.file_map) {
+ for (auto& p : file_map) {
dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl;
for (auto& q : p.second->fnode.extents) {
bool is_shared = is_shared_alloc(q.bdev);
}
// set up the log for future writes
- log.writer = _create_writer(_get_file(1));
- ceph_assert(log.writer->file->fnode.ino == 1);
- log.writer->pos = log.writer->file->fnode.size;
+ log_writer = _create_writer(_get_file(1));
+ ceph_assert(log_writer->file->fnode.ino == 1);
+ log_writer->pos = log_writer->file->fnode.size;
dout(10) << __func__ << " log write pos set to 0x"
- << std::hex << log.writer->pos << std::dec
+ << std::hex << log_writer->pos << std::dec
<< dendl;
return 0;
sync_metadata(avoid_compact);
- _close_writer(log.writer);
- log.writer = NULL;
+ _close_writer(log_writer);
+ log_writer = NULL;
vselector.reset(nullptr);
_stop_alloc();
- nodes.file_map.clear();
- nodes.dir_map.clear();
+ file_map.clear();
+ dir_map.clear();
super = bluefs_super_t();
- log.t.clear();
+ log_t.clear();
_shutdown_logger();
}
new_log_dev_cur = BDEV_NEWDB;
new_log_dev_next = BDEV_DB;
}
- _rewrite_log_and_layout_sync_LN_LD(false,
+ _rewrite_log_and_layout_sync(false,
BDEV_NEWDB,
new_log_dev_cur,
new_log_dev_next,
layout);
//}
} else if(id == BDEV_NEWWAL) {
- _rewrite_log_and_layout_sync_LN_LD(false,
+ _rewrite_log_and_layout_sync(false,
BDEV_DB,
BDEV_NEWWAL,
BDEV_WAL,
int BlueFS::fsck()
{
+ std::lock_guard l(lock);
dout(1) << __func__ << dendl;
// hrm, i think we check everything on mount...
return 0;
{
dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl;
ino_last = 1; // by the log
- uint64_t log_seq = 0;
+ log_seq = 0;
FileRef log_file;
log_file = _get_file(1);
int r = _read(log_reader, read_pos, super.block_size,
&bl, NULL);
if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) {
- r += _do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
}
assert(r == (int)super.block_size);
read_pos += r;
<< ", which is past eof" << dendl;
if (cct->_conf->bluefs_replay_recovery) {
//try to search for more data
- r += _do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
if (r < (int)more) {
//in normal mode we must read r==more, for recovery it is too strict
break;
<< std::endl;
}
- ceph_assert(next_seq > log_seq);
+ ceph_assert(next_seq >= log_seq);
log_seq = next_seq - 1; // we will increment it below
uint64_t skip = offset - read_pos;
if (skip) {
<< ": op_jump_seq " << next_seq << std::endl;
}
- ceph_assert(next_seq > log_seq);
+ ceph_assert(next_seq >= log_seq);
log_seq = next_seq - 1; // we will increment it below
}
break;
if (!noop) {
FileRef file = _get_file(ino);
ceph_assert(file->fnode.ino);
- map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
- ceph_assert(q != nodes.dir_map.end());
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
map<string,FileRef>::iterator r = q->second->file_map.find(filename);
ceph_assert(r == q->second->file_map.end());
}
if (!noop) {
- map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
- ceph_assert(q != nodes.dir_map.end());
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
map<string,FileRef>::iterator r = q->second->file_map.find(filename);
ceph_assert(r != q->second->file_map.end());
ceph_assert(r->second->refs > 0);
}
if (!noop) {
- map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
- ceph_assert(q == nodes.dir_map.end());
- nodes.dir_map[dirname] = ceph::make_ref<Dir>();
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q == dir_map.end());
+ dir_map[dirname] = ceph::make_ref<Dir>();
}
}
break;
}
if (!noop) {
- map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
- ceph_assert(q != nodes.dir_map.end());
+ map<string,DirRef>::iterator q = dir_map.find(dirname);
+ ceph_assert(q != dir_map.end());
ceph_assert(q->second->file_map.empty());
- nodes.dir_map.erase(q);
+ dir_map.erase(q);
}
}
break;
}
if (!noop) {
- auto p = nodes.file_map.find(ino);
- ceph_assert(p != nodes.file_map.end());
+ auto p = file_map.find(ino);
+ ceph_assert(p != file_map.end());
vselector->sub_usage(p->second->vselector_hint, p->second->fnode);
if (cct->_conf->bluefs_log_replay_check_allocations) {
int r = _check_allocations(p->second->fnode,
return r;
}
}
- nodes.file_map.erase(p);
+ file_map.erase(p);
}
}
break;
}
if (!noop) {
vselector->add_usage(log_file->vselector_hint, log_file->fnode);
- log.seq_live = log_seq + 1;
- dirty.seq_live = log_seq + 1;
- log.t.seq = log.seq_live;
- //log.seq_stable = log_seq;
- dirty.seq_stable = log_seq;
}
dout(10) << __func__ << " log file size was 0x"
if (!noop) {
// verify file link counts are all >0
- for (auto& p : nodes.file_map) {
+ for (auto& p : file_map) {
if (p.second->refs == 0 &&
p.second->fnode.ino > 1) {
derr << __func__ << " file with link count 0: " << p.second->fnode
int BlueFS::log_dump()
{
// only dump log file's content
- ceph_assert(log.writer == nullptr && "cannot log_dump on mounted BlueFS");
+ ceph_assert(log_writer == nullptr && "cannot log_dump on mounted BlueFS");
int r = _open_super();
if (r < 0) {
derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl;
dout(0) << __func__ << " super to be written to " << dev_target << dendl;
}
- for (auto& [ino, file_ref] : nodes.file_map) {
+ for (auto& [ino, file_ref] : file_map) {
//do not copy log
if (file_ref->fnode.ino == 1) {
continue;
new_log_dev_next;
}
- _rewrite_log_and_layout_sync_LN_LD(
+ _rewrite_log_and_layout_sync(
false,
(flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB,
new_log_dev_cur,
flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0;
int dev_target_new = dev_target; //FIXME: remove, makes no sense
- for (auto& p : nodes.file_map) {
+ for (auto& p : file_map) {
//do not copy log
if (p.second->fnode.ino == 1) {
continue;
BDEV_DB :
BDEV_SLOW;
- _rewrite_log_and_layout_sync_LN_LD(
+ _rewrite_log_and_layout_sync(
false,
super_dev,
new_log_dev_cur,
BlueFS::FileRef BlueFS::_get_file(uint64_t ino)
{
- auto p = nodes.file_map.find(ino);
- if (p == nodes.file_map.end()) {
+ auto p = file_map.find(ino);
+ if (p == file_map.end()) {
FileRef f = ceph::make_ref<File>();
- nodes.file_map[ino] = f;
+ file_map[ino] = f;
dout(30) << __func__ << " ino " << ino << " = " << f
<< " (new)" << dendl;
return f;
}
}
-void BlueFS::_drop_link_D(FileRef file)
+void BlueFS::_drop_link(FileRef file)
{
dout(20) << __func__ << " had refs " << file->refs
<< " on " << file->fnode << dendl;
ceph_assert(file->refs > 0);
- ceph_assert(ceph_mutex_is_locked(log.lock));
-
--file->refs;
if (file->refs == 0) {
dout(20) << __func__ << " destroying " << file->fnode << dendl;
ceph_assert(file->num_reading.load() == 0);
vselector->sub_usage(file->vselector_hint, file->fnode);
- log.t.op_file_remove(file->fnode.ino);
-
- std::lock_guard dl(dirty.lock);
+ log_t.op_file_remove(file->fnode.ino);
for (auto& r : file->fnode.extents) {
pending_release[r.bdev].insert(r.offset, r.length);
}
- nodes.file_map.erase(file->fnode.ino);
+ file_map.erase(file->fnode.ino);
file->deleted = true;
- if (file->dirty_seq > dirty.seq_stable) {
- // retract request to serialize changes
- ceph_assert(dirty.files.count(file->dirty_seq));
- auto it = dirty.files[file->dirty_seq].iterator_to(*file);
- dirty.files[file->dirty_seq].erase(it);
- file->dirty_seq = dirty.seq_stable;
+ if (file->dirty_seq) {
+ ceph_assert(file->dirty_seq > log_seq_stable);
+ ceph_assert(dirty_files.count(file->dirty_seq));
+ auto it = dirty_files[file->dirty_seq].iterator_to(*file);
+ dirty_files[file->dirty_seq].erase(it);
+ file->dirty_seq = 0;
}
}
}
return ret;
}
-void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
+void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
{
- std::lock_guard l(f->lock);
dout(10) << __func__ << " file " << f->fnode
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< dendl;
}
}
-uint64_t BlueFS::_estimate_log_size_N()
+uint64_t BlueFS::_estimate_log_size()
{
- std::lock_guard nl(nodes.lock);
int avg_dir_size = 40; // fixme
int avg_file_size = 12;
uint64_t size = 4096 * 2;
- size += nodes.file_map.size() * (1 + sizeof(bluefs_fnode_t));
- size += nodes.dir_map.size() + (1 + avg_dir_size);
- size += nodes.file_map.size() * (1 + avg_dir_size + avg_file_size);
+ size += file_map.size() * (1 + sizeof(bluefs_fnode_t));
+ size += dir_map.size() + (1 + avg_dir_size);
+ size += file_map.size() * (1 + avg_dir_size + avg_file_size);
return round_up_to(size, super.block_size);
}
void BlueFS::compact_log()
{
+ std::unique_lock<ceph::mutex> l(lock);
if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
if (cct->_conf->bluefs_compact_log_sync) {
- _compact_log_sync_LN_LD();
+ _compact_log_sync();
} else {
- _compact_log_async_LD_NF_D();
+ _compact_log_async(l);
}
}
}
-bool BlueFS::_should_start_compact_log_L_N()
+bool BlueFS::_should_compact_log()
{
- if (log_is_compacting.load() == true) {
- // compaction is already running
- return false;
- }
- uint64_t current;
- {
- std::lock_guard ll(log.lock);
- current = log.writer->file->fnode.size;
- }
- uint64_t expected = _estimate_log_size_N();
+ uint64_t current = log_writer->file->fnode.size;
+ uint64_t expected = _estimate_log_size();
float ratio = (float)current / (float)expected;
dout(10) << __func__ << " current 0x" << std::hex << current
<< " expected " << expected << std::dec
<< " ratio " << ratio
+ << (new_log ? " (async compaction in progress)" : "")
<< dendl;
- if (current < cct->_conf->bluefs_log_compact_min_size ||
+ if (new_log ||
+ current < cct->_conf->bluefs_log_compact_min_size ||
ratio < cct->_conf->bluefs_log_compact_min_ratio) {
return false;
}
return true;
}
-void BlueFS::_compact_log_dump_metadata_N(bluefs_transaction_t *t,
+void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
int flags)
{
- std::lock_guard nl(nodes.lock);
-
t->seq = 1;
t->uuid = super.uuid;
dout(20) << __func__ << " op_init" << dendl;
t->op_init();
- for (auto& [ino, file_ref] : nodes.file_map) {
+ for (auto& [ino, file_ref] : file_map) {
if (ino == 1)
continue;
ceph_assert(ino > 1);
- //AK - TODO - touching fnode - need to lock
+
for(auto& e : file_ref->fnode.extents) {
auto bdev = e.bdev;
auto bdev_new = bdev;
dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
t->op_file_update(file_ref->fnode);
}
- for (auto& [path, dir_ref] : nodes.dir_map) {
- dout(20) << __func__ << " op_dir_create " << path << dendl;
- t->op_dir_create(path);
- for (auto& [fname, file_ref] : dir_ref->file_map) {
- dout(20) << __func__ << " op_dir_link " << path << "/" << fname
- << " to " << file_ref->fnode.ino << dendl;
- t->op_dir_link(path, fname, file_ref->fnode.ino);
- }
- }
-}
-/* Streams to t files modified before *capture_before_seq* and all dirs */
-void BlueFS::compact_log_async_dump_metadata_NF(bluefs_transaction_t *t,
- uint64_t capture_before_seq)
-{
- std::lock_guard nl(nodes.lock);
-
- t->seq = 1;
- t->uuid = super.uuid;
- dout(20) << __func__ << " op_init" << dendl;
-
- t->op_init();
- for (auto& [ino, file_ref] : nodes.file_map) {
- if (ino == 1)
- continue;
- ceph_assert(ino > 1);
- std::lock_guard fl(file_ref->lock);
- if (file_ref->dirty_seq < capture_before_seq) {
- dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
- t->op_file_update(file_ref->fnode);
- } else {
- dout(20) << __func__ << " skipping just modified, dirty_seq="
- << file_ref->dirty_seq << " " << file_ref->fnode << dendl;
- }
- }
- for (auto& [path, dir_ref] : nodes.dir_map) {
+ for (auto& [path, dir_ref] : dir_map) {
dout(20) << __func__ << " op_dir_create " << path << dendl;
t->op_dir_create(path);
for (auto& [fname, file_ref] : dir_ref->file_map) {
}
}
-void BlueFS::_compact_log_sync_LN_LD()
+void BlueFS::_compact_log_sync()
{
dout(10) << __func__ << dendl;
auto prefer_bdev =
- vselector->select_prefer_bdev(log.writer->file->vselector_hint);
- _rewrite_log_and_layout_sync_LN_LD(true,
+ vselector->select_prefer_bdev(log_writer->file->vselector_hint);
+ _rewrite_log_and_layout_sync(true,
BDEV_DB,
prefer_bdev,
prefer_bdev,
logger->inc(l_bluefs_log_compactions);
}
-void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
- int super_dev,
- int log_dev,
- int log_dev_new,
- int flags,
- std::optional<bluefs_layout_t> layout)
+void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
+ int super_dev,
+ int log_dev,
+ int log_dev_new,
+ int flags,
+ std::optional<bluefs_layout_t> layout)
{
- std::lock_guard ll(log.lock);
+ File *log_file = log_writer->file.get();
- File *log_file = log.writer->file.get();
-
- // log.t.seq is always set to current live seq
- ceph_assert(log.t.seq == log.seq_live);
- // Capturing entire state. Dump anything that has been stored there.
- log.t.clear();
- log.t.seq = log.seq_live;
- // From now on, no changes to log.t are permitted until we finish rewriting log.
- // Can allow dirty to remain dirty - log.seq_live will not change.
+ // clear out log (be careful who calls us!!!)
+ log_t.clear();
dout(20) << __func__ << " super_dev:" << super_dev
<< " log_dev:" << log_dev
<< " flags:" << flags
<< dendl;
bluefs_transaction_t t;
- _compact_log_dump_metadata_N(&t, flags);
+ _compact_log_dump_metadata(&t, flags);
- dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl;
- t.op_jump_seq(log.seq_live);
+ dout(20) << __func__ << " op_jump_seq " << log_seq << dendl;
+ t.op_jump_seq(log_seq);
bufferlist bl;
encode(t, bl);
}
}
- _close_writer(log.writer);
+ _close_writer(log_writer);
log_file->fnode.size = bl.length();
vselector->sub_usage(log_file->vselector_hint, old_fnode);
vselector->add_usage(log_file->vselector_hint, log_file->fnode);
- log.writer = _create_writer(log_file);
- log.writer->append(bl);
- r = _flush_special(log.writer);
+ log_writer = _create_writer(log_file);
+ log_writer->append(bl);
+ r = _flush(log_writer, true);
ceph_assert(r == 0);
#ifdef HAVE_LIBAIO
if (!cct->_conf->bluefs_sync_write) {
list<aio_t> completed_ios;
- _claim_completed_aios(log.writer, &completed_ios);
- _wait_for_aio(log.writer);
+ _claim_completed_aios(log_writer, &completed_ios);
+ wait_for_aio(log_writer);
completed_ios.clear();
}
#endif
- _flush_bdev();
+ flush_bdev();
super.memorized_layout = layout;
super.log_fnode = log_file->fnode;
++super.version;
_write_super(super_dev);
- _flush_bdev();
+ flush_bdev();
dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
- std::lock_guard dl(dirty.lock);
for (auto& r : old_fnode.extents) {
pending_release[r.bdev].insert(r.offset, r.length);
}
*
* 8. Release the old log space. Clean up.
*/
-
-void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
+void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
{
dout(10) << __func__ << dendl;
- // only one compaction allowed at one time
- bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
- if (old_is_comp) {
- dout(10) << __func__ << " ongoing" <<dendl;
- return;
- }
-
- log.lock.lock();
- File *log_file = log.writer->file.get();
- FileWriter *new_log_writer = nullptr;
- FileRef new_log = nullptr;
- uint64_t new_log_jump_to = 0;
- uint64_t old_log_jump_to = 0;
+ File *log_file = log_writer->file.get();
+ ceph_assert(!new_log);
+ ceph_assert(!new_log_writer);
+ // create a new log [writer] so that we know compaction is in progress
+ // (see _should_compact_log)
new_log = ceph::make_ref<File>();
- new_log->fnode.ino = 0; // we use _flush_special to avoid log of the fnode
-
- // Part 1.
- // Prepare current log for jumping into it.
- // 1. Allocate extent
- // 2. Update op to log
- // 3. Jump op to log
- // During that, no one else can write to log, otherwise we risk jumping backwards.
- // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
+ new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode
- //signal _maybe_extend_log that expansion of log is temporary inacceptable
- bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
- ceph_assert(old_forbidden == false);
+ // 0. wait for any racing flushes to complete. (We do not want to block
+ // in _flush_sync_log with jump_to set or else a racing thread might flush
+ // our entries and our jump_to update won't be correct.)
+ while (log_flushing) {
+ dout(10) << __func__ << " log is currently flushing, waiting" << dendl;
+ log_cond.wait(l);
+ }
vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
- // 1.1 allocate new log space and jump to it.
+ // 1. allocate new log space and jump to it.
old_log_jump_to = log_file->fnode.get_allocated();
- uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos();
dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
<< " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
// update the log file change and log a jump to the offset where we want to
// write the new entries
- log.t.op_file_update(log_file->fnode);
- // jump to new position should mean next seq
- log.t.op_jump(log.seq_live + 1, old_log_jump_to);
- uint64_t seq_now = log.seq_live;
- // we need to flush all bdev because we will be streaming all dirty files to log
- // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
- // then flush_bdev() will not be necessary
- _flush_bdev();
- _flush_and_sync_log_jump_D(old_log_jump_to, runway);
-
- log.lock.unlock();
- // out of jump section - now log can be used to write to
+ log_t.op_file_update(log_file->fnode);
+ log_t.op_jump(log_seq, old_log_jump_to);
+
+ flush_bdev(); // FIXME?
+
+ _flush_and_sync_log(l, 0, old_log_jump_to);
// 2. prepare compacted log
bluefs_transaction_t t;
- compact_log_async_dump_metadata_NF(&t, seq_now);
+ //avoid record two times in log_t and _compact_log_dump_metadata.
+ log_t.clear();
+ _compact_log_dump_metadata(&t, 0);
+
uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
std::max(alloc_size[BDEV_DB],
alloc_size[BDEV_SLOW]));
// conservative estimate for final encoded size
new_log_jump_to = round_up_to(t.op_bl.length() + super.block_size * 2,
max_alloc_size);
- //newly constructed log head will jump to what we had before
- t.op_jump(seq_now, new_log_jump_to);
+ t.op_jump(log_seq, new_log_jump_to);
// allocate
//FIXME: check if we want DB here?
ceph_assert(r == 0);
// we might have some more ops in log_t due to _allocate call
- // t.claim_ops(log_t); no we no longer track allocations in log
+ t.claim_ops(log_t);
bufferlist bl;
encode(t, bl);
<< std::dec << dendl;
new_log_writer = _create_writer(new_log);
-
new_log_writer->append(bl);
- new_log->lock.lock();
- new_log_writer->lock.lock();
+
// 3. flush
- r = _flush_special(new_log_writer);
+ r = _flush(new_log_writer, true);
ceph_assert(r == 0);
// 4. wait
- _flush_bdev(new_log_writer);
- new_log_writer->lock.unlock();
- new_log->lock.unlock();
+ _flush_bdev_safely(new_log_writer);
+
// 5. update our log fnode
// discard first old_log_jump_to extents
// swap the log files. New log file is the log file now.
new_log->fnode.swap_extents(log_file->fnode);
- log.writer->pos = log.writer->file->fnode.size =
- log.writer->pos - old_log_jump_to + new_log_jump_to;
+ log_writer->pos = log_writer->file->fnode.size =
+ log_writer->pos - old_log_jump_to + new_log_jump_to;
vselector->add_usage(log_file->vselector_hint, log_file->fnode);
++super.version;
_write_super(BDEV_DB);
- _flush_bdev();
-
- old_forbidden = atomic_exchange(&log_forbidden_to_expand, false);
- ceph_assert(old_forbidden == true);
- //to wake up if someone was in need of expanding log
- log_cond.notify_all();
+ lock.unlock();
+ flush_bdev();
+ lock.lock();
// 7. release old space
dout(10) << __func__ << " release old log extents " << old_extents << dendl;
- {
- std::lock_guard dl(dirty.lock);
- for (auto& r : old_extents) {
- pending_release[r.bdev].insert(r.offset, r.length);
- }
+ for (auto& r : old_extents) {
+ pending_release[r.bdev].insert(r.offset, r.length);
}
// delete the new log, remove from the dirty files list
_close_writer(new_log_writer);
+ if (new_log->dirty_seq) {
+ ceph_assert(dirty_files.count(new_log->dirty_seq));
+ auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log);
+ dirty_files[new_log->dirty_seq].erase(it);
+ }
new_log_writer = nullptr;
new_log = nullptr;
log_cond.notify_all();
dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
logger->inc(l_bluefs_log_compactions);
-
- old_is_comp = atomic_exchange(&log_is_compacting, false);
- ceph_assert(old_is_comp);
}
void BlueFS::_pad_bl(bufferlist& bl)
}
}
-/**
- * Adds file modifications from `dirty.files` to bluefs transactions
- * already stored in `log.t`. Writes them to disk and waits until are stable.
- * Guarantees that file modifications with `want_seq` are already stable on disk.
- * In addition may insert jump forward transaction to log write position `jump_to`.
- *
- * it should lock ability to:
- * 1) add to log.t
- * 2) modify dirty.files
- * 3) add to pending_release
- *
- * pending_release and log.t go with same lock
- */
-// Returns log seq that was live before advance.
-uint64_t BlueFS::_log_advance_seq()
+int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+ uint64_t want_seq,
+ uint64_t jump_to)
{
- ceph_assert(ceph_mutex_is_locked(dirty.lock));
- ceph_assert(ceph_mutex_is_locked(log.lock));
- //acquire new seq
- // this will became seq_stable once we write
- ceph_assert(dirty.seq_stable < dirty.seq_live);
- ceph_assert(log.t.seq == log.seq_live);
- uint64_t seq = log.seq_live;
- log.t.uuid = super.uuid;
-
- ++dirty.seq_live;
- ++log.seq_live;
- ceph_assert(dirty.seq_live == log.seq_live);
- return seq;
-}
+ while (log_flushing) {
+ dout(10) << __func__ << " want_seq " << want_seq
+ << " log is currently flushing, waiting" << dendl;
+ ceph_assert(!jump_to);
+ log_cond.wait(l);
+ }
+ if (want_seq && want_seq <= log_seq_stable) {
+ dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+ << log_seq_stable << ", done" << dendl;
+ ceph_assert(!jump_to);
+ return 0;
+ }
+ if (log_t.empty() && dirty_files.empty()) {
+ dout(10) << __func__ << " want_seq " << want_seq
+ << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl;
+ ceph_assert(!jump_to);
+ return 0;
+ }
+ vector<interval_set<uint64_t>> to_release(pending_release.size());
+ to_release.swap(pending_release);
-// Adds to log.t file modifications mentioned in `dirty.files`.
-// Note: some bluefs ops may have already been stored in log.t transaction.
-void BlueFS::_consume_dirty(uint64_t seq)
-{
- ceph_assert(ceph_mutex_is_locked(dirty.lock));
- ceph_assert(ceph_mutex_is_locked(log.lock));
+ uint64_t seq = log_t.seq = ++log_seq;
+ ceph_assert(want_seq == 0 || want_seq <= seq);
+ log_t.uuid = super.uuid;
// log dirty files
- // we just incremented log_seq. It is now illegal to add to dirty.files[log_seq]
- auto lsi = dirty.files.find(seq);
- if (lsi != dirty.files.end()) {
- dout(20) << __func__ << " " << lsi->second.size() << " dirty.files" << dendl;
+ auto lsi = dirty_files.find(seq);
+ if (lsi != dirty_files.end()) {
+ dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
for (auto &f : lsi->second) {
dout(20) << __func__ << " op_file_update " << f.fnode << dendl;
- log.t.op_file_update(f.fnode);
+ log_t.op_file_update(f.fnode);
}
}
-}
-// Extends log if its free space is smaller then bluefs_min_log_runway.
-// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
-int64_t BlueFS::_maybe_extend_log()
-{
- ceph_assert(ceph_mutex_is_locked(log.lock));
+ dout(10) << __func__ << " " << log_t << dendl;
+ ceph_assert(!log_t.empty());
+
// allocate some more space (before we run out)?
- // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`.
- int64_t runway = log.writer->file->fnode.get_allocated() -
- log.writer->get_effective_write_pos();
+ // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
+ int64_t runway = log_writer->file->fnode.get_allocated() -
+ log_writer->get_effective_write_pos();
+ bool just_expanded_log = false;
if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
dout(10) << __func__ << " allocating more log runway (0x"
<< std::hex << runway << std::dec << " remaining)" << dendl;
- /*
- * Usually, when we are low on space in log, we just allocate new extent,
- * put update op(log) to log and we are fine.
- * Problem - it interferes with log compaction:
- * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
- * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
- * full state capture into new log.
- * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
- * both logs, but old log is different then new log.
- *
- * Possible solutions:
- * - stall extending log until we finish compacting and switch log (CURRENT)
- * - re-run compaction with more runway for old log
- * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
- */
- if (log_forbidden_to_expand.load() == true) {
- return -EWOULDBLOCK;
+ while (new_log_writer) {
+ dout(10) << __func__ << " waiting for async compaction" << dendl;
+ log_cond.wait(l);
}
- vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+ vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
int r = _allocate(
- vselector->select_prefer_bdev(log.writer->file->vselector_hint),
+ vselector->select_prefer_bdev(log_writer->file->vselector_hint),
cct->_conf->bluefs_max_log_runway,
- &log.writer->file->fnode);
+ &log_writer->file->fnode);
ceph_assert(r == 0);
- vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
- log.t.op_file_update(log.writer->file->fnode);
+ vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
+ log_t.op_file_update(log_writer->file->fnode);
+ just_expanded_log = true;
}
- return runway;
-}
-
-void BlueFS::_flush_and_sync_log_core(int64_t runway)
-{
- ceph_assert(ceph_mutex_is_locked(log.lock));
- dout(10) << __func__ << " " << log.t << dendl;
bufferlist bl;
bl.reserve(super.block_size);
- encode(log.t, bl);
+ encode(log_t, bl);
// pad to block boundary
size_t realign = super.block_size - (bl.length() % super.block_size);
if (realign && realign != super.block_size)
logger->inc(l_bluefs_logged_bytes, bl.length());
- if (true) {
+ if (just_expanded_log) {
ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
- // transaction will not fit extents before growth -> data loss on _replay
}
- log.writer->append(bl);
+ log_writer->append(bl);
- // prepare log for new transactions
- log.t.clear();
- log.t.seq = log.seq_live;
+ log_t.clear();
+ log_t.seq = 0; // just so debug output is less confusing
+ log_flushing = true;
- int r = _flush_special(log.writer);
+ int r = _flush(log_writer, true);
ceph_assert(r == 0);
-}
-// Clears dirty.files up to (including) seq_stable.
-void BlueFS::_clear_dirty_set_stable_D(uint64_t seq)
-{
- std::lock_guard dl(dirty.lock);
+ if (jump_to) {
+ dout(10) << __func__ << " jumping log offset from 0x" << std::hex
+ << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
+ log_writer->pos = jump_to;
+ vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ log_writer->file->fnode.size = jump_to;
+ vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ }
+
+ _flush_bdev_safely(log_writer);
+
+ log_flushing = false;
+ log_cond.notify_all();
// clean dirty files
- if (seq > dirty.seq_stable) {
- dirty.seq_stable = seq;
- dout(20) << __func__ << " seq_stable " << dirty.seq_stable << dendl;
-
- // undirty all files that were already streamed to log
- auto p = dirty.files.begin();
- while (p != dirty.files.end()) {
- if (p->first > dirty.seq_stable) {
+ if (seq > log_seq_stable) {
+ log_seq_stable = seq;
+ dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
+
+ auto p = dirty_files.begin();
+ while (p != dirty_files.end()) {
+ if (p->first > log_seq_stable) {
dout(20) << __func__ << " done cleaning up dirty files" << dendl;
break;
}
auto l = p->second.begin();
while (l != p->second.end()) {
File *file = &*l;
- ceph_assert(file->dirty_seq <= dirty.seq_stable);
+ ceph_assert(file->dirty_seq > 0);
+ ceph_assert(file->dirty_seq <= log_seq_stable);
dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
- file->dirty_seq = dirty.seq_stable;
+ file->dirty_seq = 0;
p->second.erase(l++);
}
ceph_assert(p->second.empty());
- dirty.files.erase(p++);
+ dirty_files.erase(p++);
}
} else {
- dout(20) << __func__ << " seq_stable " << dirty.seq_stable
+ dout(20) << __func__ << " log_seq_stable " << log_seq_stable
<< " already >= out seq " << seq
<< ", we lost a race against another log flush, done" << dendl;
}
-}
-void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_release)
-{
for (unsigned i = 0; i < to_release.size(); ++i) {
if (!to_release[i].empty()) {
/* OK, now we have the guarantee alloc[i] won't be null. */
}
}
}
-}
-
-int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
-{
- int64_t available_runway;
- do {
- log.lock.lock();
- dirty.lock.lock();
- if (want_seq && want_seq <= dirty.seq_stable) {
- dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
- << dirty.seq_stable << ", done" << dendl;
- dirty.lock.unlock();
- log.lock.unlock();
- return 0;
- }
-
- available_runway = _maybe_extend_log();
- if (available_runway == -EWOULDBLOCK) {
- // we are in need of adding runway, but we are during log-switch from compaction
- dirty.lock.unlock();
- //instead log.lock_unlock() do move ownership
- std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
- while (log_forbidden_to_expand.load()) {
- log_cond.wait(ll);
- }
- } else {
- ceph_assert(available_runway >= 0);
- }
- } while (available_runway < 0);
-
- ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet
- uint64_t seq =_log_advance_seq();
- _consume_dirty(seq);
- vector<interval_set<uint64_t>> to_release(pending_release.size());
- to_release.swap(pending_release);
- dirty.lock.unlock();
-
- _flush_and_sync_log_core(available_runway);
- _flush_bdev(log.writer);
- //now log.lock is no longer needed
- log.lock.unlock();
-
- _clear_dirty_set_stable_D(seq);
- _release_pending_allocations(to_release);
_update_logger_stats();
- return 0;
-}
-
-// Flushes log and immediately adjusts log_writer pos.
-int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to,
- int64_t available_runway)
-{
- ceph_assert(ceph_mutex_is_locked(log.lock));
-
- ceph_assert(jump_to);
- // we synchronize writing to log, by lock to log.lock
- dirty.lock.lock();
- uint64_t seq =_log_advance_seq();
- _consume_dirty(seq);
- vector<interval_set<uint64_t>> to_release(pending_release.size());
- to_release.swap(pending_release);
- dirty.lock.unlock();
- _flush_and_sync_log_core(available_runway);
-
- dout(10) << __func__ << " jumping log offset from 0x" << std::hex
- << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl;
- log.writer->pos = jump_to;
- vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size);
- log.writer->file->fnode.size = jump_to;
- vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size);
-
- _flush_bdev(log.writer);
-
- _clear_dirty_set_stable_D(seq);
- _release_pending_allocations(to_release);
-
- _update_logger_stats();
return 0;
}
const unsigned length,
const bluefs_super_t& super)
{
- ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1);
ceph::bufferlist bl;
if (partial) {
tail_block.splice(0, tail_block.length(), &bl);
return bl;
}
-int BlueFS::_signal_dirty_to_log_D(FileWriter *h)
+int BlueFS::_signal_dirty_to_log(FileWriter *h)
{
- ceph_assert(ceph_mutex_is_locked(h->lock));
- std::lock_guard dl(dirty.lock);
h->file->fnode.mtime = ceph_clock_now();
ceph_assert(h->file->fnode.ino >= 1);
- if (h->file->dirty_seq <= dirty.seq_stable) {
- h->file->dirty_seq = dirty.seq_live;
- dirty.files[h->file->dirty_seq].push_back(*h->file);
- dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+ if (h->file->dirty_seq == 0) {
+ h->file->dirty_seq = log_seq + 1;
+ dirty_files[h->file->dirty_seq].push_back(*h->file);
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
<< " (was clean)" << dendl;
} else {
- if (h->file->dirty_seq != dirty.seq_live) {
+ if (h->file->dirty_seq != log_seq + 1) {
// need re-dirty, erase from list first
- ceph_assert(dirty.files.count(h->file->dirty_seq));
- auto it = dirty.files[h->file->dirty_seq].iterator_to(*h->file);
- dirty.files[h->file->dirty_seq].erase(it);
- h->file->dirty_seq = dirty.seq_live;
- dirty.files[h->file->dirty_seq].push_back(*h->file);
- dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+ ceph_assert(dirty_files.count(h->file->dirty_seq));
+ auto it = dirty_files[h->file->dirty_seq].iterator_to(*h->file);
+ dirty_files[h->file->dirty_seq].erase(it);
+ h->file->dirty_seq = log_seq + 1;
+ dirty_files[h->file->dirty_seq].push_back(*h->file);
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
<< " (was " << h->file->dirty_seq << ")" << dendl;
} else {
- dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+ dout(20) << __func__ << " dirty_seq = " << log_seq + 1
<< " (unchanged, do nothing) " << dendl;
}
}
return 0;
}
-void BlueFS::flush_range/*WF*/(FileWriter *h, uint64_t offset, uint64_t length) {
- std::unique_lock hl(h->lock);
- _flush_range_F(h, offset, length);
-}
-
-int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
+int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
{
- ceph_assert(ceph_mutex_is_locked(h->lock));
dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos
<< " 0x" << offset << "~" << length << std::dec
<< " to " << h->file->fnode << dendl;
ceph_assert(h->file->num_readers.load() == 0);
- bool buffered = cct->_conf->bluefs_buffered_io;
- ceph_assert(h->file->fnode.ino > 1);
+ bool buffered;
+ if (h->file->fnode.ino == 1)
+ buffered = false;
+ else
+ buffered = cct->_conf->bluefs_buffered_io;
if (offset + length <= h->pos)
return 0;
<< std::hex << offset << "~" << length << std::dec
<< dendl;
}
- std::lock_guard file_lock(h->file->lock);
ceph_assert(offset <= h->file->fnode.size);
uint64_t allocated = h->file->fnode.get_allocated();
vselector->sub_usage(h->file->vselector_hint, h->file->fnode);
// do not bother to dirty the file if we are overwriting
// previously allocated extents.
+
if (allocated < offset + length) {
// we should never run out of log space here; see the min runway check
// in _flush_and_sync_log.
+ ceph_assert(h->file->fnode.ino != 1);
int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint),
offset + length - allocated,
&h->file->fnode);
}
if (h->file->fnode.size < offset + length) {
h->file->fnode.size = offset + length;
- h->file->is_dirty = true;
+ if (h->file->fnode.ino > 1) {
+ // we do not need to dirty the log file (or it's compacting
+ // replacement) when the file size changes because replay is
+ // smart enough to discover it on its own.
+ h->file->is_dirty = true;
+ }
}
-
dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl;
- return _flush_data(h, offset, length, buffered);
-}
-int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered)
-{
- if (h->file->fnode.ino != 1) {
- ceph_assert(ceph_mutex_is_locked(h->lock));
- ceph_assert(ceph_mutex_is_locked(h->file->lock));
- }
uint64_t x_off = 0;
auto p = h->file->fnode.seek(offset, &x_off);
ceph_assert(p != h->file->fnode.extents.end());
dout(10) << __func__ << " got " << ls->size() << " aios" << dendl;
}
-void BlueFS::_wait_for_aio(FileWriter *h)
+void BlueFS::wait_for_aio(FileWriter *h)
{
// NOTE: this is safe to call without a lock, as long as our reference is
// stable.
}
#endif
-void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)
-{
- bool flushed_sum = false;
- {
- std::unique_lock hl(h->lock);
- size_t max_size = 1ull << 30; // cap to 1GB
- while (len > 0) {
- bool need_flush = true;
- auto l0 = h->get_buffer_length();
- if (l0 < max_size) {
- size_t l = std::min(len, max_size - l0);
- h->append(buf, l);
- buf += l;
- len -= l;
- need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
- }
- if (need_flush) {
- bool flushed = false;
- int r = _flush_F(h, true, &flushed);
- ceph_assert(r == 0);
- flushed_sum |= flushed;
- // make sure we've made any progress with flush hence the
- // loop doesn't iterate forever
- ceph_assert(h->get_buffer_length() < max_size);
- }
- }
- }
- if (flushed_sum) {
- _maybe_compact_log_LN_NF_LD_D();
- }
-}
-
-void BlueFS::flush(FileWriter *h, bool force)
+int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
{
bool flushed = false;
- int r;
- {
- std::unique_lock hl(h->lock);
- r = _flush_F(h, force, &flushed);
- ceph_assert(r == 0);
- }
+ int r = _flush(h, force, &flushed);
if (r == 0 && flushed) {
- _maybe_compact_log_LN_NF_LD_D();
+ _maybe_compact_log(l);
}
+ return r;
}
-int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed)
+int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
{
- ceph_assert(ceph_mutex_is_locked(h->lock));
uint64_t length = h->get_buffer_length();
uint64_t offset = h->pos;
if (flushed) {
<< std::hex << offset << "~" << length << std::dec
<< " to " << h->file->fnode << dendl;
ceph_assert(h->pos <= h->file->fnode.size);
- int r = _flush_range_F(h, offset, length);
+ int r = _flush_range(h, offset, length);
if (flushed) {
*flushed = true;
}
return r;
}
-// Flush for bluefs special files.
-// Does not add extents to h.
-// Does not mark h as dirty.
-// we do not need to dirty the log file (or it's compacting
-// replacement) when the file size changes because replay is
-// smart enough to discover it on its own.
-int BlueFS::_flush_special(FileWriter *h)
+int BlueFS::_truncate(FileWriter *h, uint64_t offset)
{
- ceph_assert(h->file->fnode.ino <= 1);
- uint64_t length = h->get_buffer_length();
- uint64_t offset = h->pos;
- ceph_assert(length + offset <= h->file->fnode.get_allocated());
- if (h->file->fnode.size < offset + length) {
- h->file->fnode.size = offset + length;
- }
- return _flush_data(h, offset, length, false);
-}
-
-int BlueFS::truncate(FileWriter *h, uint64_t offset)
-{
- std::lock_guard hl(h->lock);
dout(10) << __func__ << " 0x" << std::hex << offset << std::dec
<< " file " << h->file->fnode << dendl;
if (h->file->deleted) {
ceph_abort_msg("actually this shouldn't happen");
}
if (h->get_buffer_length()) {
- int r = _flush_F(h, true);
+ int r = _flush(h, true);
if (r < 0)
return r;
}
vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
h->file->fnode.size = offset;
vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
- std::lock_guard ll(log.lock);
- log.t.op_file_update(h->file->fnode);
+ log_t.op_file_update(h->file->fnode);
return 0;
}
-int BlueFS::fsync(FileWriter *h)
+int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
{
- std::unique_lock hl(h->lock);
- uint64_t old_dirty_seq = 0;
- {
- dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
- int r = _flush_F(h, true);
- if (r < 0)
- return r;
- _flush_bdev(h);
- if (h->file->is_dirty) {
- _signal_dirty_to_log_D(h);
- h->file->is_dirty = false;
- }
- {
- std::lock_guard dl(dirty.lock);
- if (dirty.seq_stable < h->file->dirty_seq) {
- old_dirty_seq = h->file->dirty_seq;
- dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
- << ") on " << h->file->fnode << ", flushing log" << dendl;
- }
- }
+ dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
+ int r = _flush(h, true);
+ if (r < 0)
+ return r;
+ if (h->file->is_dirty) {
+ _signal_dirty_to_log(h);
+ h->file->is_dirty = false;
}
+ uint64_t old_dirty_seq = h->file->dirty_seq;
+
+ _flush_bdev_safely(h);
+
if (old_dirty_seq) {
- _flush_and_sync_log_LD(old_dirty_seq);
+ uint64_t s = log_seq;
+ dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
+ << ") on " << h->file->fnode << ", flushing log" << dendl;
+ _flush_and_sync_log(l, old_dirty_seq);
+ ceph_assert(h->file->dirty_seq == 0 || // cleaned
+ h->file->dirty_seq > s); // or redirtied by someone else
}
- _maybe_compact_log_LN_NF_LD_D();
return 0;
}
-// be careful - either h->file->lock or log.lock must be taken
-void BlueFS::_flush_bdev(FileWriter *h)
+void BlueFS::_flush_bdev_safely(FileWriter *h)
{
- if (h->file->fnode.ino != 1) {
- ceph_assert(ceph_mutex_is_locked(h->lock));
- } else {
- ceph_assert(ceph_mutex_is_locked(log.lock));
- }
std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
h->dirty_devs.fill(false);
#ifdef HAVE_LIBAIO
if (!cct->_conf->bluefs_sync_write) {
list<aio_t> completed_ios;
_claim_completed_aios(h, &completed_ios);
- _wait_for_aio(h);
+ lock.unlock();
+ wait_for_aio(h);
completed_ios.clear();
- }
+ flush_bdev(flush_devs);
+ lock.lock();
+ } else
#endif
- _flush_bdev(flush_devs);
+ {
+ lock.unlock();
+ flush_bdev(flush_devs);
+ lock.lock();
+ }
}
-void BlueFS::_flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
+void BlueFS::flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
{
// NOTE: this is safe to call without a lock.
dout(20) << __func__ << dendl;
}
}
-void BlueFS::_flush_bdev()
+void BlueFS::flush_bdev()
{
// NOTE: this is safe to call without a lock.
dout(20) << __func__ << dendl;
return 0;
}
-int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)
+int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
{
- std::lock_guard ll(log.lock);
- std::lock_guard fl(f->lock);
dout(10) << __func__ << " file " << f->fnode << " 0x"
<< std::hex << off << "~" << len << std::dec << dendl;
if (f->deleted) {
vselector->add_usage(f->vselector_hint, f->fnode);
if (r < 0)
return r;
- log.t.op_file_update(f->fnode);
+ log_t.op_file_update(f->fnode);
}
return 0;
}
void BlueFS::sync_metadata(bool avoid_compact)
{
- bool can_skip_flush;
- {
- std::lock_guard ll(log.lock);
- std::lock_guard dl(dirty.lock);
- can_skip_flush = log.t.empty() && dirty.files.empty();
- }
- if (can_skip_flush) {
+ std::unique_lock l(lock);
+ if (log_t.empty() && dirty_files.empty()) {
dout(10) << __func__ << " - no pending log events" << dendl;
} else {
utime_t start;
lgeneric_subdout(cct, bluefs, 10) << __func__;
start = ceph_clock_now();
*_dout << dendl;
- _flush_bdev(); // FIXME?
- _flush_and_sync_log_LD();
+ flush_bdev(); // FIXME?
+ _flush_and_sync_log(l);
dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
}
if (!avoid_compact) {
- _maybe_compact_log_LN_NF_LD_D();
+ _maybe_compact_log(l);
}
}
-void BlueFS::_maybe_compact_log_LN_NF_LD_D()
+void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
{
if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
- _should_start_compact_log_L_N()) {
+ _should_compact_log()) {
if (cct->_conf->bluefs_compact_log_sync) {
- _compact_log_sync_LN_LD();
+ _compact_log_sync();
} else {
- _compact_log_async_LD_NF_D();
+ _compact_log_async(l);
}
}
}
FileWriter **h,
bool overwrite)
{
- std::unique_lock nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
DirRef dir;
- if (p == nodes.dir_map.end()) {
+ if (p == dir_map.end()) {
// implicitly create the dir
dout(20) << __func__ << " dir " << dirname
<< " does not exist" << dendl;
}
file = ceph::make_ref<File>();
file->fnode.ino = ++ino_last;
- nodes.file_map[ino_last] = file;
+ file_map[ino_last] = file;
dir->file_map[string{filename}] = file;
++file->refs;
create = true;
dout(20) << __func__ << " mapping " << dirname << "/" << filename
<< " vsel_hint " << file->vselector_hint
<< dendl;
- nl.unlock();
- {
- std::lock_guard ll(log.lock);
- log.t.op_file_update(file->fnode);
- if (create)
- log.t.op_dir_link(dirname, filename, file->fnode.ino);
- }
+
+ log_t.op_file_update(file->fnode);
+ if (create)
+ log_t.op_dir_link(dirname, filename, file->fnode.ino);
+
*h = _create_writer(file);
if (boost::algorithm::ends_with(filename, ".log")) {
return w;
}
-void BlueFS::_drain_writer(FileWriter *h)
+void BlueFS::_close_writer(FileWriter *h)
{
dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl;
//h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer);
if (h->file->fnode.size >= (1ull << 30)) {
dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl;
}
-}
-
-void BlueFS::_close_writer(FileWriter *h)
-{
- _drain_writer(h);
- delete h;
-}
-void BlueFS::close_writer(FileWriter *h)
-{
- {
- std::lock_guard l(h->lock);
- _drain_writer(h);
- }
delete h;
}
uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h)
{
- std::lock_guard l(h->lock);
+ std::lock_guard l(lock);
return h->file->dirty_seq;
}
bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev)
{
- std::lock_guard l(h->lock);
+ std::lock_guard l(lock);
return h->dirty_devs[dev];
}
FileReader **h,
bool random)
{
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << "/" << filename
<< (random ? " (random)":" (sequential)") << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
return -ENOENT;
}
std::string_view old_dirname, std::string_view old_filename,
std::string_view new_dirname, std::string_view new_filename)
{
- std::lock_guard ll(log.lock);
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << old_dirname << "/" << old_filename
<< " -> " << new_dirname << "/" << new_filename << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(old_dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(old_dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl;
return -ENOENT;
}
}
FileRef file = q->second;
- p = nodes.dir_map.find(new_dirname);
- if (p == nodes.dir_map.end()) {
+ p = dir_map.find(new_dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl;
return -ENOENT;
}
<< ") file " << new_filename
<< " already exists, unlinking" << dendl;
ceph_assert(q->second != file);
- log.t.op_dir_unlink(new_dirname, new_filename);
- _drop_link_D(q->second);
+ log_t.op_dir_unlink(new_dirname, new_filename);
+ _drop_link(q->second);
}
dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " "
new_dir->file_map[string{new_filename}] = file;
old_dir->file_map.erase(string{old_filename});
- log.t.op_dir_link(new_dirname, new_filename, file->fnode.ino);
- log.t.op_dir_unlink(old_dirname, old_filename);
+ log_t.op_dir_link(new_dirname, new_filename, file->fnode.ino);
+ log_t.op_dir_unlink(old_dirname, old_filename);
return 0;
}
int BlueFS::mkdir(std::string_view dirname)
{
- std::lock_guard ll(log.lock);
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p != nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p != dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " exists" << dendl;
return -EEXIST;
}
- nodes.dir_map[string{dirname}] = ceph::make_ref<Dir>();
- log.t.op_dir_create(dirname);
+ dir_map[string{dirname}] = ceph::make_ref<Dir>();
+ log_t.op_dir_create(dirname);
return 0;
}
int BlueFS::rmdir(std::string_view dirname)
{
- std::lock_guard ll(log.lock);
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << dendl;
- auto p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ auto p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl;
return -ENOENT;
}
dout(20) << __func__ << " dir " << dirname << " not empty" << dendl;
return -ENOTEMPTY;
}
- nodes.dir_map.erase(string{dirname});
- log.t.op_dir_remove(dirname);
+ dir_map.erase(string{dirname});
+ log_t.op_dir_remove(dirname);
return 0;
}
bool BlueFS::dir_exists(std::string_view dirname)
{
- std::lock_guard nl(nodes.lock);
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- bool exists = p != nodes.dir_map.end();
+ std::lock_guard l(lock);
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ bool exists = p != dir_map.end();
dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl;
return exists;
}
int BlueFS::stat(std::string_view dirname, std::string_view filename,
uint64_t *size, utime_t *mtime)
{
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
return -ENOENT;
}
int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
FileLock **plock)
{
- std::lock_guard ll(log.lock);
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
return -ENOENT;
}
file = ceph::make_ref<File>();
file->fnode.ino = ++ino_last;
file->fnode.mtime = ceph_clock_now();
- nodes.file_map[ino_last] = file;
+ file_map[ino_last] = file;
dir->file_map[string{filename}] = file;
++file->refs;
- log.t.op_file_update(file->fnode);
- log.t.op_dir_link(dirname, filename, file->fnode.ino);
+ log_t.op_file_update(file->fnode);
+ log_t.op_dir_link(dirname, filename, file->fnode.ino);
} else {
file = q->second;
if (file->locked) {
int BlueFS::unlock_file(FileLock *fl)
{
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl;
ceph_assert(fl->file->locked);
fl->file->locked = false;
if (!dirname.empty() && dirname.back() == '/') {
dirname.remove_suffix(1);
}
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << dendl;
if (dirname.empty()) {
// list dirs
- ls->reserve(nodes.dir_map.size() + 2);
- for (auto& q : nodes.dir_map) {
+ ls->reserve(dir_map.size() + 2);
+ for (auto& q : dir_map) {
ls->push_back(q.first);
}
} else {
// list files in dir
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
return -ENOENT;
}
int BlueFS::unlink(std::string_view dirname, std::string_view filename)
{
- std::lock_guard ll(log.lock);
- std::lock_guard nl(nodes.lock);
+ std::lock_guard l(lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
- map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
- if (p == nodes.dir_map.end()) {
+ map<string,DirRef>::iterator p = dir_map.find(dirname);
+ if (p == dir_map.end()) {
dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
return -ENOENT;
}
return -EBUSY;
}
dir->file_map.erase(string{filename});
- log.t.op_dir_unlink(dirname, filename);
- _drop_link_D(file);
+ log_t.op_dir_unlink(dirname, filename);
+ _drop_link(file);
return 0;
}
When we find it, we decode following bytes as extent.
We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction.
*/
-int BlueFS::_do_replay_recovery_read(FileReader *log_reader,
+int BlueFS::do_replay_recovery_read(FileReader *log_reader,
size_t replay_pos,
size_t read_offset,
size_t read_len,
dout(2) << __func__ << " processing " << get_device_name(dev) << dendl;
interval_set<uint64_t> disk_regions;
disk_regions.insert(0, bdev[dev]->get_size());
- for (auto f : nodes.file_map) {
+ for (auto f : file_map) {
auto& e = f.second->fnode.extents;
for (auto& p : e) {
if (p.bdev == dev) {