// 1. allocate new log space and jump to it.
old_log_jump_to = log_file->fnode.get_allocated();
+ uint64_t runway = log_file->fnode.get_allocated() - log_writer->get_effective_write_pos();
dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
<< " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
log_t.op_file_update(log_file->fnode);
log_t.op_jump(log_seq, old_log_jump_to);
- flush_bdev(); // FIXME?
+ // we need to flush all bdev because we will be streaming all dirty files to log
+ // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
+ // then flush_bdev() will not be necessary
+ flush_bdev();
- _flush_and_sync_log(l, 0, old_log_jump_to);
+ _flush_and_sync_log_jump(old_log_jump_to, runway);
// 2. prepare compacted log
bluefs_transaction_t t;
}
}
-
-int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
- uint64_t want_seq,
- uint64_t jump_to)
+// Adds to log_t file modifications mentioned in `dirty_files`.
+// Note: some bluefs ops may have already been stored in log_t transaction.
+uint64_t BlueFS::_consume_dirty()
{
- while (log_flushing) {
- dout(10) << __func__ << " want_seq " << want_seq
- << " log is currently flushing, waiting" << dendl;
- ceph_assert(!jump_to);
- log_cond.wait(l);
- }
- if (want_seq && want_seq <= log_seq_stable) {
- dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
- << log_seq_stable << ", done" << dendl;
- ceph_assert(!jump_to);
- return 0;
- }
- if (log_t.empty() && dirty_files.empty()) {
- dout(10) << __func__ << " want_seq " << want_seq
- << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl;
- ceph_assert(!jump_to);
- return 0;
- }
-
- vector<interval_set<uint64_t>> to_release(pending_release.size());
- to_release.swap(pending_release);
-
+ //acquire new seq
+ // this will became log_seq_stable once we write
uint64_t seq = log_t.seq = ++log_seq;
- ceph_assert(want_seq == 0 || want_seq <= seq);
log_t.uuid = super.uuid;
// log dirty files
+ // we just incremented log_seq. It is now illegal to add to dirty_files[log_seq]
auto lsi = dirty_files.find(seq);
if (lsi != dirty_files.end()) {
dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
log_t.op_file_update_inc(f.fnode);
}
}
+ return seq;
+}
- dout(10) << __func__ << " " << log_t << dendl;
- ceph_assert(!log_t.empty());
-
+// Extends log if its free space is smaller then bluefs_min_log_runway.
+// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
+int64_t BlueFS::_maybe_extend_log()
+{
// allocate some more space (before we run out)?
// BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
int64_t runway = log_writer->file->fnode.get_allocated() -
log_writer->get_effective_write_pos();
- bool just_expanded_log = false;
if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
dout(10) << __func__ << " allocating more log runway (0x"
<< std::hex << runway << std::dec << " remaining)" << dendl;
- while (new_log_writer) {
- dout(10) << __func__ << " waiting for async compaction" << dendl;
- log_cond.wait(l);
+ /*
+ * Usually, when we are low on space in log, we just allocate new extent,
+ * put update op(log) to log and we are fine.
+ * Problem - it interferes with log compaction:
+ * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
+ * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
+ * full state capture into new log.
+ * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
+ * both logs, but old log is different then new log.
+ *
+ * Possible solutions:
+ * - stall extending log until we finish compacting and switch log (CURRENT)
+ * - re-run compaction with more runway for old log
+ * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
+ */
+ if (new_log_writer) {
+ return -EWOULDBLOCK;
}
vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
int r = _allocate(
ceph_assert(r == 0);
vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
log_t.op_file_update_inc(log_writer->file->fnode);
- just_expanded_log = true;
}
+ return runway;
+}
+
+void BlueFS::_flush_and_sync_log_core(int64_t runway)
+{
+ dout(10) << __func__ << " " << log_t << dendl;
bufferlist bl;
bl.reserve(super.block_size);
logger->inc(l_bluefs_logged_bytes, bl.length());
- if (just_expanded_log) {
+ if (true) {
ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
+ // transaction will not fit extents before growth -> data loss on _replay
}
log_writer->append(bl);
log_t.clear();
log_t.seq = 0; // just so debug output is less confusing
- log_flushing = true;
int r = _flush(log_writer, true);
ceph_assert(r == 0);
+}
- if (jump_to) {
- dout(10) << __func__ << " jumping log offset from 0x" << std::hex
- << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
- log_writer->pos = jump_to;
- vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
- log_writer->file->fnode.size = jump_to;
- vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
- }
-
- _flush_bdev_safely(log_writer);
-
- log_flushing = false;
- log_cond.notify_all();
-
+// Clears dirty_files up to (including) seq_stable.
+void BlueFS::_clear_dirty_set_stable(uint64_t seq)
+{
// clean dirty files
if (seq > log_seq_stable) {
log_seq_stable = seq;
dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
+ // undirty all files that were already streamed to log
auto p = dirty_files.begin();
while (p != dirty_files.end()) {
if (p->first > log_seq_stable) {
<< " already >= out seq " << seq
<< ", we lost a race against another log flush, done" << dendl;
}
+}
+void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_release)
+{
for (unsigned i = 0; i < to_release.size(); ++i) {
if (!to_release[i].empty()) {
/* OK, now we have the guarantee alloc[i] won't be null. */
}
}
}
+}
+
+int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+ uint64_t want_seq)
+{
+ if (want_seq && want_seq <= log_seq_stable) {
+ dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+ << log_seq_stable << ", done" << dendl;
+ return 0;
+ }
+ int64_t available_runway;
+ do {
+ available_runway = _maybe_extend_log();
+ if (available_runway == -EWOULDBLOCK) {
+ while (new_log_writer) {
+ dout(10) << __func__ << " waiting for async compaction" << dendl;
+ log_cond.wait(l);
+ }
+ }
+ } while (available_runway == -EWOULDBLOCK);
+
+ ceph_assert(want_seq == 0 || want_seq <= log_seq + 1); // illegal to request seq that was not created yet
+
+ uint64_t seq = _consume_dirty();
+ vector<interval_set<uint64_t>> to_release(pending_release.size());
+ to_release.swap(pending_release);
+
+ _flush_and_sync_log_core(available_runway);
+ _flush_bdev_safely(log_writer);
+
+ _clear_dirty_set_stable(seq);
+ _release_pending_allocations(to_release);
_update_logger_stats();
+ return 0;
+}
+
+// Flushes log and immediately adjusts log_writer pos.
+int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to,
+ int64_t available_runway)
+{
+ ceph_assert(jump_to);
+ uint64_t seq = _consume_dirty();
+ vector<interval_set<uint64_t>> to_release(pending_release.size());
+ to_release.swap(pending_release);
+
+ _flush_and_sync_log_core(available_runway);
+
+ dout(10) << __func__ << " jumping log offset from 0x" << std::hex
+ << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
+ log_writer->pos = jump_to;
+ vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ log_writer->file->fnode.size = jump_to;
+ vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+ _flush_bdev_safely(log_writer);
+
+ _clear_dirty_set_stable(seq);
+ _release_pending_allocations(to_release);
+
+ _update_logger_stats();
return 0;
}