out.append(ss);
} else if (command == "bluefs files list") {
const char* devnames[3] = {"wal","db","slow"};
- std::lock_guard l(bluefs->lock);
+ std::lock_guard l(bluefs->dirs_lock);
f->open_array_section("files");
for (auto &d : bluefs->dir_map) {
std::string dir = d.first;
delete logger;
}
+//AK - TODO - locking needed but not certain
void BlueFS::_update_logger_stats()
{
// we must be holding the lock
uint64_t BlueFS::get_used()
{
- std::lock_guard l(lock);
uint64_t used = 0;
for (unsigned id = 0; id < MAX_BDEV; ++id) {
used += _get_used(id);
{
ceph_assert(id < alloc.size());
ceph_assert(alloc[id]);
- std::lock_guard l(lock);
return _get_used(id);
}
uint64_t BlueFS::get_total(unsigned id)
{
- std::lock_guard l(lock);
return _get_total(id);
}
uint64_t BlueFS::get_free(unsigned id)
{
- std::lock_guard l(lock);
ceph_assert(id < alloc.size());
return alloc[id]->get_free();
}
int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " bdev " << id << dendl;
ceph_assert(id < alloc.size());
for (auto& p : file_map) {
int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
{
- std::unique_lock l(lock);
dout(1) << __func__
<< " osd_uuid " << osd_uuid
<< dendl;
// initial txn
log_t.op_init();
- _flush_and_sync_log(l);
+ flush_and_sync_log();
// write supers
super.log_fnode = log_file->fnode;
new_log_dev_cur = BDEV_NEWDB;
new_log_dev_next = BDEV_DB;
}
- _rewrite_log_and_layout_sync(false,
+ rewrite_log_and_layout_sync(false,
BDEV_NEWDB,
new_log_dev_cur,
new_log_dev_next,
layout);
//}
} else if(id == BDEV_NEWWAL) {
- _rewrite_log_and_layout_sync(false,
+ rewrite_log_and_layout_sync(false,
BDEV_DB,
BDEV_NEWWAL,
BDEV_WAL,
int BlueFS::fsck()
{
- std::lock_guard l(lock);
dout(1) << __func__ << dendl;
// hrm, i think we check everything on mount...
return 0;
new_log_dev_next;
}
- _rewrite_log_and_layout_sync(
+ rewrite_log_and_layout_sync(
false,
(flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB,
new_log_dev_cur,
BDEV_DB :
BDEV_SLOW;
- _rewrite_log_and_layout_sync(
+ rewrite_log_and_layout_sync(
false,
super_dev,
new_log_dev_cur,
dout(20) << __func__ << " had refs " << file->refs
<< " on " << file->fnode << dendl;
ceph_assert(file->refs > 0);
+ ceph_assert(ceph_mutex_is_locked(log_lock));
+
--file->refs;
if (file->refs == 0) {
dout(20) << __func__ << " destroying " << file->fnode << dendl;
ceph_assert(file->num_reading.load() == 0);
vselector->sub_usage(file->vselector_hint, file->fnode);
log_t.op_file_remove(file->fnode.ino);
+
+ std::lock_guard dl(dirty_lock);
for (auto& r : file->fnode.extents) {
pending_release[r.bdev].insert(r.offset, r.length);
}
file->deleted = true;
if (file->dirty_seq) {
+ // retract request to serialize changes
ceph_assert(file->dirty_seq > log_seq_stable);
ceph_assert(dirty_files.count(file->dirty_seq));
auto it = dirty_files[file->dirty_seq].iterator_to(*file);
return ret;
}
-void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
+void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
{
+ std::lock_guard l(f->lock);
dout(10) << __func__ << " file " << f->fnode
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< dendl;
}
}
-uint64_t BlueFS::_estimate_log_size()
+uint64_t BlueFS::estimate_log_size()
{
+ std::lock_guard dirl(dirs_lock);
int avg_dir_size = 40; // fixme
int avg_file_size = 12;
uint64_t size = 4096 * 2;
void BlueFS::compact_log()
{
- std::unique_lock<ceph::mutex> l(lock);
if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
if (cct->_conf->bluefs_compact_log_sync) {
- _compact_log_sync();
+ compact_log_sync();
} else {
- _compact_log_async(l);
+ compact_log_async();
}
}
}
-bool BlueFS::_should_compact_log()
+bool BlueFS::should_start_compact_log()
{
- uint64_t current = log_writer->file->fnode.size;
- uint64_t expected = _estimate_log_size();
+ if (log_is_compacting.load() == true) {
+ // compaction is already running
+ return false;
+ }
+ uint64_t current;
+ {
+ std::lock_guard dirl(log_lock);
+ current = log_writer->file->fnode.size;
+ }
+ uint64_t expected = estimate_log_size();
float ratio = (float)current / (float)expected;
dout(10) << __func__ << " current 0x" << std::hex << current
<< " expected " << expected << std::dec
<< " ratio " << ratio
- << (new_log ? " (async compaction in progress)" : "")
<< dendl;
- if (new_log ||
- current < cct->_conf->bluefs_log_compact_min_size ||
+ if (current < cct->_conf->bluefs_log_compact_min_size ||
ratio < cct->_conf->bluefs_log_compact_min_ratio) {
return false;
}
return true;
}
-void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
+void BlueFS::compact_log_dump_metadata(bluefs_transaction_t *t,
int flags)
{
+ std::lock_guard dirl(dirs_lock);
+
t->seq = 1;
t->uuid = super.uuid;
dout(20) << __func__ << " op_init" << dendl;
if (ino == 1)
continue;
ceph_assert(ino > 1);
-
+ //AK - TODO - touching fnode - need to lock
for(auto& e : file_ref->fnode.extents) {
auto bdev = e.bdev;
auto bdev_new = bdev;
}
}
-void BlueFS::_compact_log_sync()
+void BlueFS::compact_log_sync()
{
dout(10) << __func__ << dendl;
auto prefer_bdev =
vselector->select_prefer_bdev(log_writer->file->vselector_hint);
- _rewrite_log_and_layout_sync(true,
+ rewrite_log_and_layout_sync(true,
BDEV_DB,
prefer_bdev,
prefer_bdev,
logger->inc(l_bluefs_log_compactions);
}
-void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
- int super_dev,
- int log_dev,
- int log_dev_new,
- int flags,
- std::optional<bluefs_layout_t> layout)
+void BlueFS::rewrite_log_and_layout_sync(bool allocate_with_fallback,
+ int super_dev,
+ int log_dev,
+ int log_dev_new,
+ int flags,
+ std::optional<bluefs_layout_t> layout)
{
+ //ceph_assert(ceph_mutex_is_notlocked(log_lock));
+ std::lock_guard ll(log_lock);
+
File *log_file = log_writer->file.get();
// clear out log (be careful who calls us!!!)
<< " flags:" << flags
<< dendl;
bluefs_transaction_t t;
- _compact_log_dump_metadata(&t, flags);
+ compact_log_dump_metadata(&t, flags);
dout(20) << __func__ << " op_jump_seq " << log_seq << dendl;
t.op_jump_seq(log_seq);
flush_bdev();
dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
+ std::lock_guard dl(dirty_lock);
for (auto& r : old_fnode.extents) {
pending_release[r.bdev].insert(r.offset, r.length);
}
*
* 8. Release the old log space. Clean up.
*/
-void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
+
+void BlueFS::compact_log_async()
{
dout(10) << __func__ << dendl;
+ // only one compaction allowed at one time
+ bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
+ if (old_is_comp) {
+ dout(10) << __func__ << " ongoing" <<dendl;
+ return;
+ }
+
+ log_lock.lock();
File *log_file = log_writer->file.get();
- ceph_assert(!new_log);
- ceph_assert(!new_log_writer);
+ FileWriter *new_log_writer = nullptr;
+ FileRef new_log = nullptr;
+ uint64_t new_log_jump_to = 0;
+ uint64_t old_log_jump_to = 0;
- // create a new log [writer] so that we know compaction is in progress
- // (see _should_compact_log)
new_log = ceph::make_ref<File>();
- new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode
+ new_log->fnode.ino = 0; // we use _flush_special to avoid log of the fnode
- // 0. wait for any racing flushes to complete. (We do not want to block
- // in _flush_sync_log with jump_to set or else a racing thread might flush
- // our entries and our jump_to update won't be correct.)
- while (log_flushing) {
- dout(10) << __func__ << " log is currently flushing, waiting" << dendl;
- log_cond.wait(l);
- }
+ // Part 1.
+ // Prepare current log for jumping into it.
+ // 1. Allocate extent
+ // 2. Update op to log
+ // 3. Jump op to log
+ // During that, no one else can write to log, otherwise we risk jumping backwards.
+ // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
+
+ //signal _maybe_extend_log that expansion of log is temporary inacceptable
+ bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
+ ceph_assert(old_forbidden == false);
vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
- // 1. allocate new log space and jump to it.
+ // 1.1 allocate new log space and jump to it.
old_log_jump_to = log_file->fnode.get_allocated();
uint64_t runway = log_file->fnode.get_allocated() - log_writer->get_effective_write_pos();
dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
// update the log file change and log a jump to the offset where we want to
// write the new entries
- log_t.op_file_update(log_file->fnode);
- log_t.op_jump(log_seq, old_log_jump_to);
+ log_t.op_file_update(log_file->fnode); // 1.2
+ log_t.op_jump(log_seq, old_log_jump_to); // 1.3
// we need to flush all bdev because we will be streaming all dirty files to log
// TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
// then flush_bdev() will not be necessary
flush_bdev();
-
_flush_and_sync_log_jump(old_log_jump_to, runway);
+ log_lock.unlock();
+ // out of jump section - now log can be used to write to
+
// 2. prepare compacted log
bluefs_transaction_t t;
- //avoid record two times in log_t and _compact_log_dump_metadata.
- log_t.clear();
- _compact_log_dump_metadata(&t, 0);
+ //this needs files lock
+ //what will happen, if a file is modified *twice* before we stream it to log?
+ //the later state that we capture will be seen earlier and replay will see a temporary retraction (!)
+ compact_log_dump_metadata(&t, 0);
uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
std::max(alloc_size[BDEV_DB],
ceph_assert(r == 0);
// we might have some more ops in log_t due to _allocate call
- t.claim_ops(log_t);
+ // t.claim_ops(log_t); no we no longer track allocations in log
bufferlist bl;
encode(t, bl);
<< std::dec << dendl;
new_log_writer = _create_writer(new_log);
- new_log_writer->append(bl);
+ new_log_writer->append(bl);
+ new_log_writer->lock.lock();
// 3. flush
r = _flush_special(new_log_writer);
ceph_assert(r == 0);
// 4. wait
- _flush_bdev_safely(new_log_writer);
-
+ _flush_bdev(new_log_writer);
+ new_log_writer->lock.unlock();
// 5. update our log fnode
// discard first old_log_jump_to extents
++super.version;
_write_super(BDEV_DB);
- lock.unlock();
flush_bdev();
- lock.lock();
+
+ old_forbidden = atomic_exchange(&log_forbidden_to_expand, false);
+ ceph_assert(old_forbidden == true);
+ //to wake up if someone was in need of expanding log
+ log_cond.notify_all();
// 7. release old space
dout(10) << __func__ << " release old log extents " << old_extents << dendl;
- for (auto& r : old_extents) {
- pending_release[r.bdev].insert(r.offset, r.length);
+ {
+ std::lock_guard dl(dirty_lock);
+ for (auto& r : old_extents) {
+ pending_release[r.bdev].insert(r.offset, r.length);
+ }
}
// delete the new log, remove from the dirty files list
_close_writer(new_log_writer);
+ // flush_special does not dirty files
+ /*
if (new_log->dirty_seq) {
+ std::lock_guard dl(dirty_lock);
ceph_assert(dirty_files.count(new_log->dirty_seq));
auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log);
dirty_files[new_log->dirty_seq].erase(it);
}
+ */
new_log_writer = nullptr;
new_log = nullptr;
log_cond.notify_all();
dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
logger->inc(l_bluefs_log_compactions);
+
+ old_is_comp = atomic_exchange(&log_is_compacting, false);
+ ceph_assert(old_is_comp);
}
void BlueFS::_pad_bl(bufferlist& bl)
}
}
+/**
+ * Adds file modifications from `dirty_files` to bluefs transactions
+ * already stored in `log_t`. Writes them to disk and waits until are stable.
+ * Guarantees that file modifications with `want_seq` are already stable on disk.
+ * In addition may insert jump forward transaction to log write position `jump_to`.
+ *
+ * it should lock ability to:
+ * 1) add to log_t
+ * 2) modify dirty_files
+ * 3) add to pending_release
+ *
+ * pending_release and log_t go with same lock
+ */
+
// Adds to log_t file modifications mentioned in `dirty_files`.
// Note: some bluefs ops may have already been stored in log_t transaction.
uint64_t BlueFS::_consume_dirty()
{
+ ceph_assert(ceph_mutex_is_locked(dirty_lock));
+ ceph_assert(ceph_mutex_is_locked(log_lock));
//acquire new seq
// this will became log_seq_stable once we write
uint64_t seq = log_t.seq = ++log_seq;
// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
int64_t BlueFS::_maybe_extend_log()
{
+ ceph_assert(ceph_mutex_is_locked(log_lock));
// allocate some more space (before we run out)?
// BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
int64_t runway = log_writer->file->fnode.get_allocated() -
* - re-run compaction with more runway for old log
* - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
*/
- if (new_log_writer) {
+ if (log_forbidden_to_expand.load() == true) {
return -EWOULDBLOCK;
}
vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
void BlueFS::_flush_and_sync_log_core(int64_t runway)
{
+ ceph_assert(ceph_mutex_is_locked(log_lock));
dout(10) << __func__ << " " << log_t << dendl;
bufferlist bl;
// Clears dirty_files up to (including) seq_stable.
void BlueFS::_clear_dirty_set_stable(uint64_t seq)
{
+ std::lock_guard lg(dirty_lock);
+
// clean dirty files
if (seq > log_seq_stable) {
log_seq_stable = seq;
}
}
-int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
- uint64_t want_seq)
+int BlueFS::flush_and_sync_log(uint64_t want_seq)
{
- if (want_seq && want_seq <= log_seq_stable) {
- dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
- << log_seq_stable << ", done" << dendl;
- return 0;
- }
+ // we synchronize writing to log, by lock to log_lock
int64_t available_runway;
do {
+ log_lock.lock();
+ dirty_lock.lock();
+ if (want_seq && want_seq <= log_seq_stable) {
+ dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+ << log_seq_stable << ", done" << dendl;
+ dirty_lock.unlock();
+ log_lock.unlock();
+ return 0;
+ }
+
available_runway = _maybe_extend_log();
if (available_runway == -EWOULDBLOCK) {
- while (new_log_writer) {
- dout(10) << __func__ << " waiting for async compaction" << dendl;
- log_cond.wait(l);
+ // we are in need of adding runway, but we are during log-switch from compaction
+ dirty_lock.unlock();
+ //instead log_lock_unlock() do move ownership
+ std::unique_lock<ceph::mutex> ll(log_lock, std::adopt_lock);
+ while (log_forbidden_to_expand.load()) {
+ log_cond.wait(ll);
}
+ } else {
+ ceph_assert(available_runway >= 0);
}
- } while (available_runway == -EWOULDBLOCK);
+ } while (available_runway < 0);
ceph_assert(want_seq == 0 || want_seq <= log_seq + 1); // illegal to request seq that was not created yet
uint64_t seq = _consume_dirty();
vector<interval_set<uint64_t>> to_release(pending_release.size());
to_release.swap(pending_release);
+ dirty_lock.unlock();
_flush_and_sync_log_core(available_runway);
- _flush_bdev_safely(log_writer);
+ _flush_bdev(log_writer);
+ //now log_lock is no longer needed
+ log_lock.unlock();
_clear_dirty_set_stable(seq);
_release_pending_allocations(to_release);
int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to,
int64_t available_runway)
{
+ ceph_assert(ceph_mutex_is_locked(log_lock));
+
ceph_assert(jump_to);
+ // we synchronize writing to log, by lock to log_lock
+
+ dirty_lock.lock();
uint64_t seq = _consume_dirty();
vector<interval_set<uint64_t>> to_release(pending_release.size());
to_release.swap(pending_release);
-
+ dirty_lock.unlock();
_flush_and_sync_log_core(available_runway);
dout(10) << __func__ << " jumping log offset from 0x" << std::hex
log_writer->file->fnode.size = jump_to;
vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
- _flush_bdev_safely(log_writer);
+ _flush_bdev(log_writer);
_clear_dirty_set_stable(seq);
_release_pending_allocations(to_release);
const unsigned length,
const bluefs_super_t& super)
{
+ ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1);
ceph::bufferlist bl;
if (partial) {
tail_block.splice(0, tail_block.length(), &bl);
int BlueFS::_signal_dirty_to_log(FileWriter *h)
{
+ ceph_assert(ceph_mutex_is_locked(h->lock));
+ std::lock_guard dl(dirty_lock);
if (h->file->deleted) {
dout(10) << __func__ << " deleted, no-op" << dendl;
return 0;
return 0;
}
+void BlueFS::flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
+ std::unique_lock hl(h->lock);
+ _flush_range(h, offset, length);
+}
+
int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
{
+ ceph_assert(ceph_mutex_is_locked(h->lock));
dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos
<< " 0x" << offset << "~" << length << std::dec
<< " to " << h->file->fnode << dendl;
<< std::hex << offset << "~" << length << std::dec
<< dendl;
}
+ std::lock_guard file_lock(h->file->lock);
ceph_assert(offset <= h->file->fnode.size);
uint64_t allocated = h->file->fnode.get_allocated();
vselector->sub_usage(h->file->vselector_hint, h->file->fnode);
// do not bother to dirty the file if we are overwriting
// previously allocated extents.
-
if (allocated < offset + length) {
// we should never run out of log space here; see the min runway check
// in _flush_and_sync_log.
h->file->fnode.size = offset + length;
h->file->is_dirty = true;
}
+
dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl;
return _flush_data(h, offset, length, buffered);
}
int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered)
{
+ //ceph_assert(ceph_mutex_is_locked(h->lock));
+ //ceph_assert(ceph_mutex_is_locked(h->file->lock));
uint64_t x_off = 0;
auto p = h->file->fnode.seek(offset, &x_off);
ceph_assert(p != h->file->fnode.extents.end());
}
#endif
-int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
+
+void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)
+{
+ std::unique_lock hl(h->lock);
+ size_t max_size = 1ull << 30; // cap to 1GB
+ while (len > 0) {
+ bool need_flush = true;
+ auto l0 = h->get_buffer_length();
+ if (l0 < max_size) {
+ size_t l = std::min(len, max_size - l0);
+ h->append(buf, l);
+ buf += l;
+ len -= l;
+ need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
+ }
+ if (need_flush) {
+ bool flushed = false;
+ int r = _flush(h, true, &flushed);
+ ceph_assert(r == 0);
+ if (r == 0 && flushed) {
+ maybe_compact_log();
+ }
+ // make sure we've made any progress with flush hence the
+ // loop doesn't iterate forever
+ ceph_assert(h->get_buffer_length() < max_size);
+ }
+ }
+}
+
+void BlueFS::flush(FileWriter *h, bool force)
{
+ std::unique_lock hl(h->lock);
bool flushed = false;
int r = _flush(h, force, &flushed);
+ ceph_assert(r == 0);
if (r == 0 && flushed) {
- _maybe_compact_log(l);
+ maybe_compact_log();
}
- return r;
}
int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
{
+ ceph_assert(ceph_mutex_is_locked(h->lock));
uint64_t length = h->get_buffer_length();
uint64_t offset = h->pos;
if (flushed) {
// smart enough to discover it on its own.
int BlueFS::_flush_special(FileWriter *h)
{
+ //ceph_assert(ceph_mutex_is_locked(h->lock));
+ //ceph_assert(ceph_mutex_is_locked(h->file->lock));
uint64_t length = h->get_buffer_length();
uint64_t offset = h->pos;
ceph_assert(length + offset <= h->file->fnode.get_allocated());
return _flush_data(h, offset, length, false);
}
-int BlueFS::_truncate(FileWriter *h, uint64_t offset)
+int BlueFS::truncate(FileWriter *h, uint64_t offset)
{
+ std::lock_guard hl(h->lock);
dout(10) << __func__ << " 0x" << std::hex << offset << std::dec
<< " file " << h->file->fnode << dendl;
if (h->file->deleted) {
ceph_abort_msg("truncate up not supported");
}
ceph_assert(h->file->fnode.size >= offset);
- _flush_bdev_safely(h);
+ _flush_bdev(h);
vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
h->file->fnode.size = offset;
vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
+ std::lock_guard ll(log_lock);
log_t.op_file_update_inc(h->file->fnode);
return 0;
}
-int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
+int BlueFS::fsync(FileWriter *h)
{
+ std::unique_lock hl(h->lock);
dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
int r = _flush(h, true);
if (r < 0)
}
uint64_t old_dirty_seq = h->file->dirty_seq;
- _flush_bdev_safely(h);
+ _flush_bdev(h);
if (old_dirty_seq) {
uint64_t s = log_seq;
dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
<< ") on " << h->file->fnode << ", flushing log" << dendl;
- _flush_and_sync_log(l, old_dirty_seq);
+ flush_and_sync_log(old_dirty_seq);
+ // AK - TODO - think - how can dirty_seq change if we are under h lock?
ceph_assert(h->file->dirty_seq == 0 || // cleaned
- h->file->dirty_seq > s); // or redirtied by someone else
+ h->file->dirty_seq > s); // or redirtied by someone else
}
+ maybe_compact_log();
return 0;
}
-void BlueFS::_flush_bdev_safely(FileWriter *h)
+// be careful - either h->file->lock or log_lock must be taken
+void BlueFS::_flush_bdev(FileWriter *h)
{
+ if (h->file->fnode.ino != 1) {
+ ceph_assert(ceph_mutex_is_locked(h->lock));
+ } else {
+ ceph_assert(ceph_mutex_is_locked(log_lock));
+ }
std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
h->dirty_devs.fill(false);
#ifdef HAVE_LIBAIO
if (!cct->_conf->bluefs_sync_write) {
list<aio_t> completed_ios;
_claim_completed_aios(h, &completed_ios);
- lock.unlock();
wait_for_aio(h);
completed_ios.clear();
- flush_bdev(flush_devs);
- lock.lock();
- } else
-#endif
- {
- lock.unlock();
- flush_bdev(flush_devs);
- lock.lock();
}
+#endif
+ flush_bdev(flush_devs);
}
void BlueFS::flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
return 0;
}
-int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
+int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)
{
+ std::lock_guard fl(f->lock);
dout(10) << __func__ << " file " << f->fnode << " 0x"
<< std::hex << off << "~" << len << std::dec << dendl;
if (f->deleted) {
vselector->add_usage(f->vselector_hint, f->fnode);
if (r < 0)
return r;
+ std::lock_guard ll(log_lock);
log_t.op_file_update_inc(f->fnode);
}
return 0;
void BlueFS::sync_metadata(bool avoid_compact)
{
- std::unique_lock l(lock);
- if (log_t.empty() && dirty_files.empty()) {
+ bool can_skip_flush;
+ {
+ std::lock_guard ll(log_lock);
+ std::lock_guard dl(dirty_lock);
+ can_skip_flush = log_t.empty() && dirty_files.empty();
+ }
+ if (can_skip_flush) {
dout(10) << __func__ << " - no pending log events" << dendl;
} else {
utime_t start;
start = ceph_clock_now();
*_dout << dendl;
flush_bdev(); // FIXME?
- _flush_and_sync_log(l);
+ flush_and_sync_log();
dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
}
if (!avoid_compact) {
- _maybe_compact_log(l);
+ maybe_compact_log();
}
}
-void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
+void BlueFS::maybe_compact_log()
{
if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
- _should_compact_log()) {
+ should_start_compact_log()) {
if (cct->_conf->bluefs_compact_log_sync) {
- _compact_log_sync();
+ compact_log_sync();
} else {
- _compact_log_async(l);
+ compact_log_async();
}
}
}
FileWriter **h,
bool overwrite)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
DirRef dir;
dout(20) << __func__ << " mapping " << dirname << "/" << filename
<< " vsel_hint " << file->vselector_hint
<< dendl;
-
- log_t.op_file_update(file->fnode);
- if (create)
- log_t.op_dir_link(dirname, filename, file->fnode.ino);
-
+ {
+ std::lock_guard ll(log_lock);
+ log_t.op_file_update(file->fnode);
+ if (create)
+ log_t.op_dir_link(dirname, filename, file->fnode.ino);
+ }
*h = _create_writer(file);
if (boost::algorithm::ends_with(filename, ".log")) {
return w;
}
-void BlueFS::_close_writer(FileWriter *h)
+void BlueFS::_drain_writer(FileWriter *h)
{
dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl;
//h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer);
if (h->file->fnode.size >= (1ull << 30)) {
dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl;
}
+}
+
+void BlueFS::_close_writer(FileWriter *h)
+{
+ _drain_writer(h);
+ delete h;
+}
+void BlueFS::close_writer(FileWriter *h)
+{
+ {
+ std::lock_guard l(h->lock);
+ _drain_writer(h);
+ }
delete h;
}
uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h)
{
- std::lock_guard l(lock);
+ std::lock_guard l(h->lock);
return h->file->dirty_seq;
}
bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev)
{
- std::lock_guard l(lock);
+ std::lock_guard l(h->lock);
return h->dirty_devs[dev];
}
FileReader **h,
bool random)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << dirname << "/" << filename
<< (random ? " (random)":" (sequential)") << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
std::string_view old_dirname, std::string_view old_filename,
std::string_view new_dirname, std::string_view new_filename)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
+ std::lock_guard ll(log_lock);
dout(10) << __func__ << " " << old_dirname << "/" << old_filename
<< " -> " << new_dirname << "/" << new_filename << dendl;
map<string,DirRef>::iterator p = dir_map.find(old_dirname);
int BlueFS::mkdir(std::string_view dirname)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
+ std::lock_guard ll(log_lock);
dout(10) << __func__ << " " << dirname << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
if (p != dir_map.end()) {
int BlueFS::rmdir(std::string_view dirname)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
+ std::lock_guard ll(log_lock);
dout(10) << __func__ << " " << dirname << dendl;
auto p = dir_map.find(dirname);
if (p == dir_map.end()) {
bool BlueFS::dir_exists(std::string_view dirname)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
map<string,DirRef>::iterator p = dir_map.find(dirname);
bool exists = p != dir_map.end();
dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl;
int BlueFS::stat(std::string_view dirname, std::string_view filename,
uint64_t *size, utime_t *mtime)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
if (p == dir_map.end()) {
int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
FileLock **plock)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
if (p == dir_map.end()) {
file_map[ino_last] = file;
dir->file_map[string{filename}] = file;
++file->refs;
+ std::lock_guard ll(log_lock);
log_t.op_file_update(file->fnode);
log_t.op_dir_link(dirname, filename, file->fnode.ino);
} else {
int BlueFS::unlock_file(FileLock *fl)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl;
ceph_assert(fl->file->locked);
fl->file->locked = false;
if (!dirname.empty() && dirname.back() == '/') {
dirname.remove_suffix(1);
}
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
dout(10) << __func__ << " " << dirname << dendl;
if (dirname.empty()) {
// list dirs
int BlueFS::unlink(std::string_view dirname, std::string_view filename)
{
- std::lock_guard l(lock);
+ std::lock_guard dirl(dirs_lock);
+ std::lock_guard ll(log_lock);
dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
map<string,DirRef>::iterator p = dir_map.find(dirname);
if (p == dir_map.end()) {
std::atomic_int num_reading;
void* vselector_hint = nullptr;
+ /* lock protects fnode and other the parts that can be modified during read & write operations.
+ Does not protect values that are fixed
+ Does not need to be taken when doing one-time operations:
+ _replay, device_migrate_to_existing, device_migrate_to_new */
+ ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock");
private:
FRIEND_MAKE_REF(File);
};
private:
- ceph::mutex lock = ceph::make_mutex("BlueFS::lock");
-
+ ceph::mutex log_lock = ceph::make_mutex("BlueFS::log_lock");
+ ceph::mutex dirs_lock = ceph::make_mutex("BlueFS::dirs_lock");
+ ceph::mutex dirty_lock = ceph::make_mutex("BlueFS::dirty_lock");
PerfCounters *logger = nullptr;
uint64_t max_bytes[MAX_BDEV] = {0};
FileWriter *log_writer = 0; ///< writer for the log
bluefs_transaction_t log_t; ///< pending, unwritten log transaction
bool log_flushing = false; ///< true while flushing the log
- ceph::condition_variable log_cond;
-
- uint64_t new_log_jump_to = 0;
- uint64_t old_log_jump_to = 0;
- FileRef new_log = nullptr;
- FileWriter *new_log_writer = nullptr;
+ ceph::condition_variable log_cond; ///< used for state control between log flush / log compaction
+ ceph::mutex cond_lock = ceph::make_mutex("BlueFS::cond_lock"); ///< ....
+ std::atomic<bool> log_is_compacting{false}; ///< signals that bluefs log is already ongoing compaction
+ std::atomic<bool> log_forbidden_to_expand{false}; ///< used to signal that async compaction is in state
+ /// that prohibits expansion of bluefs log
/*
* There are up to 3 block devices:
*
std::vector<Allocator*> alloc; ///< allocators for bdevs
std::vector<uint64_t> alloc_size; ///< alloc size for each device
std::vector<interval_set<uint64_t>> pending_release; ///< extents to release
+ // TODO: it should be examined what makes pending_release immune to
+ // eras in a way similar to dirty_files. Hints:
+ // 1) we have actually only 2 eras: log_seq and log_seq+1
+ // 2) we usually not remove extents from files. And when we do, we force log-syncing.
+
//std::vector<interval_set<uint64_t>> block_unused_too_granular;
BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev
int _signal_dirty_to_log(FileWriter *h);
int _flush_range(FileWriter *h, uint64_t offset, uint64_t length);
int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered);
- int _flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l);
int _flush(FileWriter *h, bool force, bool *flushed = nullptr);
int _flush_special(FileWriter *h);
- int _fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l);
+ int _fsync(FileWriter *h);
#ifdef HAVE_LIBAIO
void _claim_completed_aios(FileWriter *h, std::list<aio_t> *ls);
void _flush_and_sync_log_core(int64_t available_runway);
int _flush_and_sync_log_jump(uint64_t jump_to,
int64_t available_runway);
- int _flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
- uint64_t want_seq = 0);
+ int flush_and_sync_log(uint64_t want_seq = 0);
- uint64_t _estimate_log_size();
- bool _should_compact_log();
+ uint64_t estimate_log_size();
+ bool should_start_compact_log();
enum {
REMOVE_DB = 1,
RENAME_SLOW2DB = 4,
RENAME_DB2SLOW = 8,
};
- void _compact_log_dump_metadata(bluefs_transaction_t *t,
- int flags);
- void _compact_log_sync();
- void _compact_log_async(std::unique_lock<ceph::mutex>& l);
+ void compact_log_dump_metadata(bluefs_transaction_t *t,
+ int flags);
+ void compact_log_sync();
+ void compact_log_async();
- void _rewrite_log_and_layout_sync(bool allocate_with_fallback,
+ void rewrite_log_and_layout_sync(bool allocate_with_fallback,
int super_dev,
int log_dev,
int new_log_dev,
//void _aio_finish(void *priv);
- void _flush_bdev_safely(FileWriter *h);
+ void _flush_bdev(FileWriter *h);
void flush_bdev(); // this is safe to call without a lock
void flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs); // this is safe to call without a lock
uint64_t len, ///< [in] this many bytes
char *out); ///< [out] optional: or copy it here
- void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length);
-
int _open_super();
int _write_super(int dev);
int _check_allocations(const bluefs_fnode_t& fnode,
int _replay(bool noop, bool to_stdout = false); ///< replay journal
FileWriter *_create_writer(FileRef f);
+ void _drain_writer(FileWriter *h);
void _close_writer(FileWriter *h);
// always put the super in the second 4k block. FIXME should this be
FileReader **h,
bool random = false);
- void close_writer(FileWriter *h) {
- std::lock_guard l(lock);
- _close_writer(h);
- }
+ // data added after last fsync() is lost
+ void close_writer(FileWriter *h);
int rename(std::string_view old_dir, std::string_view old_file,
std::string_view new_dir, std::string_view new_file);
/// sync any uncommitted state to disk
void sync_metadata(bool avoid_compact);
/// test and compact log, if necessary
- void _maybe_compact_log(std::unique_lock<ceph::mutex>& l);
+ void maybe_compact_log();
void set_volume_selector(BlueFSVolumeSelector* s) {
vselector.reset(s);
// handler for discard event
void handle_discard(unsigned dev, interval_set<uint64_t>& to_release);
- void flush(FileWriter *h, bool force = false) {
- std::unique_lock l(lock);
- int r = _flush(h, force, l);
- ceph_assert(r == 0);
- }
+ void flush(FileWriter *h, bool force = false);
- void append_try_flush(FileWriter *h, const char* buf, size_t len) {
- size_t max_size = 1ull << 30; // cap to 1GB
- while (len > 0) {
- bool need_flush = true;
- auto l0 = h->get_buffer_length();
- if (l0 < max_size) {
- size_t l = std::min(len, max_size - l0);
- h->append(buf, l);
- buf += l;
- len -= l;
- need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
- }
- if (need_flush) {
- flush(h, true);
- // make sure we've made any progress with flush hence the
- // loop doesn't iterate forever
- ceph_assert(h->get_buffer_length() < max_size);
- }
- }
- }
- void flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
- std::lock_guard l(lock);
- _flush_range(h, offset, length);
- }
- int fsync(FileWriter *h) {
- std::unique_lock l(lock);
- int r = _fsync(h, l);
- _maybe_compact_log(l);
- return r;
- }
+ void append_try_flush(FileWriter *h, const char* buf, size_t len);
+ void flush_range(FileWriter *h, uint64_t offset, uint64_t length);
+ int fsync(FileWriter *h);
int64_t read(FileReader *h, uint64_t offset, size_t len,
ceph::buffer::list *outbl, char *out) {
// no need to hold the global lock here; we only touch h and
// atomics and asserts).
return _read_random(h, offset, len, out);
}
- void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) {
- std::lock_guard l(lock);
- _invalidate_cache(f, offset, len);
- }
- int preallocate(FileRef f, uint64_t offset, uint64_t len) {
- std::lock_guard l(lock);
- return _preallocate(f, offset, len);
- }
- int truncate(FileWriter *h, uint64_t offset) {
- std::lock_guard l(lock);
- return _truncate(h, offset);
- }
+ void invalidate_cache(FileRef f, uint64_t offset, uint64_t len);
+ int preallocate(FileRef f, uint64_t offset, uint64_t len);
+ int truncate(FileWriter *h, uint64_t offset);
int do_replay_recovery_read(FileReader *log,
size_t log_pos,
size_t read_offset,