using std::byte;
using std::list;
-using std::make_pair;
using std::map;
using std::ostream;
-using std::pair;
using std::set;
using std::string;
using std::to_string;
using std::vector;
-using std::chrono::duration;
using std::chrono::seconds;
using ceph::bufferlist;
// Part 0.
// Lock the log and forbid its expansion and other compactions
+ // lock log's run-time structures for a while
+ log.lock.lock();
+
+ // Extend log in case of having a big transaction waiting before starting compaction.
+ _maybe_extend_log();
+
// only one compaction allowed at one time
bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
if (old_is_comp) {
dout(10) << __func__ << " ongoing" <<dendl;
+ log.lock.unlock();
return;
}
- // lock log's run-time structures for a while
- log.lock.lock();
auto t0 = mono_clock::now();
// Part 1.
// During that, no one else can write to log, otherwise we risk jumping backwards.
// We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
- //signal _maybe_extend_log that expansion of log is temporary inacceptable
+ //signal _extend_log that expansion of log is temporary inacceptable
bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
ceph_assert(old_forbidden == false);
// 1.1 allocate new log extents and store them at fnode_tail
File *log_file = log.writer->file.get();
+
old_log_jump_to = log_file->fnode.get_allocated();
bluefs_fnode_t fnode_tail;
- uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos();
dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
<< " need 0x" << cct->_conf->bluefs_max_log_runway << std::dec << dendl;
int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
// TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
// then flush_bdev() will not be necessary
_flush_bdev();
- _flush_and_sync_log_jump_D(old_log_jump_to, runway);
+ _flush_and_sync_log_jump_D(old_log_jump_to);
//
// Part 2.
}
}
-// Extends log if its free space is smaller then bluefs_min_log_runway.
-// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
-int64_t BlueFS::_maybe_extend_log()
-{
+int64_t BlueFS::_maybe_extend_log() {
+ uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+ // increasing the size of the log involves adding a OP_FILE_UPDATE_INC which its size will
+ // increase with respect the number of extents. bluefs_min_log_runway should cover the max size
+ // a log can get.
+ // inject new allocation in case log is too big
+ size_t expected_log_size = 0;
+ log.t.bound_encode(expected_log_size);
+ if (expected_log_size + cct->_conf->bluefs_min_log_runway > runway) {
+ _extend_log(expected_log_size + cct->_conf->bluefs_max_log_runway);
+ } else if (runway < cct->_conf->bluefs_min_log_runway) {
+ _extend_log(cct->_conf->bluefs_max_log_runway);
+ }
+ runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+ return runway;
+}
+
+void BlueFS::_extend_log(uint64_t amount) {
ceph_assert(ceph_mutex_is_locked(log.lock));
- // allocate some more space (before we run out)?
- // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`.
- int64_t runway = log.writer->file->fnode.get_allocated() -
- log.writer->get_effective_write_pos();
- if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
- dout(10) << __func__ << " allocating more log runway (0x"
- << std::hex << runway << std::dec << " remaining)" << dendl;
- /*
- * Usually, when we are low on space in log, we just allocate new extent,
- * put update op(log) to log and we are fine.
- * Problem - it interferes with log compaction:
- * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
- * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
- * full state capture into new log.
- * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
- * both logs, but old log is different then new log.
- *
- * Possible solutions:
- * - stall extending log until we finish compacting and switch log (CURRENT)
- * - re-run compaction with more runway for old log
- * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
- */
- if (log_forbidden_to_expand.load() == true) {
- return -EWOULDBLOCK;
- }
- vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
- int r = _allocate(
+ std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
+ while (log_forbidden_to_expand.load() == true) {
+ log_cond.wait(ll);
+ }
+ ll.release();
+ uint64_t allocated_before_extension = log.writer->file->fnode.get_allocated();
+ vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+ amount = round_up_to(amount, super.block_size);
+ int r = _allocate(
vselector->select_prefer_bdev(log.writer->file->vselector_hint),
- cct->_conf->bluefs_max_log_runway,
+ amount,
0,
&log.writer->file->fnode);
- ceph_assert(r == 0);
- vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
- log.t.op_file_update_inc(log.writer->file->fnode);
+ ceph_assert(r == 0);
+ dout(10) << "extended log by 0x" << std::hex << amount << " bytes " << dendl;
+ vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+
+ bluefs_transaction_t log_extend_transaction;
+ log_extend_transaction.seq = log.t.seq;
+ log_extend_transaction.uuid = log.t.uuid;
+ log_extend_transaction.op_file_update_inc(log.writer->file->fnode);
+
+ bufferlist bl;
+ bl.reserve(super.block_size);
+ encode(log_extend_transaction, bl);
+ _pad_bl(bl, super.block_size);
+ log.writer->append(bl);
+ ceph_assert(allocated_before_extension >= log.writer->get_effective_write_pos());
+ log.t.seq = log.seq_live;
+
+ // before sync_core we advance the seq
+ {
+ std::unique_lock<ceph::mutex> l(dirty.lock);
+ _log_advance_seq();
}
- return runway;
}
-void BlueFS::_flush_and_sync_log_core(int64_t runway)
+void BlueFS::_flush_and_sync_log_core()
{
ceph_assert(ceph_mutex_is_locked(log.lock));
dout(10) << __func__ << " " << log.t << dendl;
+
bufferlist bl;
bl.reserve(super.block_size);
encode(log.t, bl);
logger->inc(l_bluefs_log_write_count, 1);
logger->inc(l_bluefs_logged_bytes, bl.length());
- if (true) {
- ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
- // transaction will not fit extents before growth -> data loss on _replay
- }
+ uint64_t runway = log.writer->file->fnode.get_allocated() - log.writer->get_effective_write_pos();
+ // ensure runway is big enough, this should be taken care of by _maybe_extend_log,
+ // but let's keep this here just in case.
+ ceph_assert(bl.length() <= runway);
+
log.writer->append(bl);
int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
{
- int64_t available_runway;
- do {
- log.lock.lock();
- dirty.lock.lock();
- if (want_seq && want_seq <= dirty.seq_stable) {
- dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
- << dirty.seq_stable << ", done" << dendl;
- dirty.lock.unlock();
- log.lock.unlock();
- return 0;
- }
-
- available_runway = _maybe_extend_log();
- if (available_runway == -EWOULDBLOCK) {
- // we are in need of adding runway, but we are during log-switch from compaction
- dirty.lock.unlock();
- //instead log.lock.unlock() do move ownership
- std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
- while (log_forbidden_to_expand.load()) {
- log_cond.wait(ll);
- }
- } else {
- ceph_assert(available_runway >= 0);
- }
- } while (available_runway < 0);
+ log.lock.lock();
+ dirty.lock.lock();
+ if (want_seq && want_seq <= dirty.seq_stable) {
+ dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
+ << dirty.seq_stable << ", done" << dendl;
+ dirty.lock.unlock();
+ log.lock.unlock();
+ return 0;
+ }
ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet
uint64_t seq =_log_advance_seq();
to_release.swap(dirty.pending_release);
dirty.lock.unlock();
- _flush_and_sync_log_core(available_runway);
+ _maybe_extend_log();
+ _flush_and_sync_log_core();
_flush_bdev(log.writer);
logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size);
//now log.lock is no longer needed
}
// Flushes log and immediately adjusts log_writer pos.
-int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to,
- int64_t available_runway)
+int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to)
{
ceph_assert(ceph_mutex_is_locked(log.lock));
vector<interval_set<uint64_t>> to_release(dirty.pending_release.size());
to_release.swap(dirty.pending_release);
dirty.lock.unlock();
- _flush_and_sync_log_core(available_runway);
+ _flush_and_sync_log_core();
dout(10) << __func__ << " jumping log offset from 0x" << std::hex
<< log.writer->pos << " -> 0x" << jump_to << std::dec << dendl;