return ROUND_UP_TO(size, super.block_size);
}
-
bool BlueFS::_should_compact_log()
{
uint64_t current = log_writer->file->fnode.size;
void BlueFS::_compact_log_sync()
{
- // FIXME: we currently hold the lock while writing out the compacted log,
- // which may mean a latency spike. we could drop the lock while writing out
- // the big compacted log, while continuing to log at the end of the old log
- // file, and once it's done swap out the old log extents for the new ones.
dout(10) << __func__ << dendl;
File *log_file = log_writer->file.get();
logger->inc(l_bluefs_log_compactions);
}
+/*
+ * 1. Allocate a new extent to continue the log, and then log an event
+ * that jumps the log write position to the new extent. At this point, the
+ * old extent(s) won't be written to, and reflect everything to compact.
+ * New events will be written to the new region that we'll keep.
+ *
+ * 2. While still holding the lock, encode a bufferlist that dumps all of the
+ * in-memory fnodes and names. This will become the new beginning of the
+ * log. The last event will jump to the log continuation extent from #1.
+ *
+ * 3. Queue a write to a new extent for the new beginnging of the log.
+ *
+ * 4. Drop lock and wait
+ *
+ * 5. Retake the lock.
+ *
+ * 6. Update the log_fnode to splice in the new beginning.
+ *
+ * 7. Write the new superblock.
+ *
+ * 8. Release the old log space. Clean up.
+ */
+void BlueFS::_compact_log_async(std::unique_lock<std::mutex>& l)
+{
+ dout(10) << __func__ << dendl;
+ File *log_file = log_writer->file.get();
+
+ // 1. allocate new log space and jump to it.
+ old_log_jump_to = log_file->fnode.get_allocated();
+ uint64_t need = old_log_jump_to + g_conf->bluefs_max_log_runway;
+ dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
+ << " need 0x" << need << std::dec << dendl;
+ while (log_file->fnode.get_allocated() < need) {
+ int r = _allocate(log_file->fnode.prefer_bdev,
+ g_conf->bluefs_max_log_runway,
+ &log_file->fnode.extents);
+ assert(r == 0);
+ }
+ dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+
+ // update the log file change and log a jump to the offset where we want to
+ // write the new entries
+ log_t.op_file_update(log_file->fnode);
+ log_t.op_jump(log_seq, old_log_jump_to);
+ _flush_and_sync_log(l, 0, old_log_jump_to);
+
+ // 2. prepare compacted log
+ bluefs_transaction_t t;
+ _compact_log_dump_metadata(&t);
+
+ bufferlist bl;
+ ::encode(t, bl);
+ _pad_bl(bl);
+
+ new_log_jump_to = ROUND_UP_TO(bl.length() + super.block_size,
+ g_conf->bluefs_alloc_size);
+ bluefs_transaction_t t2;
+ t2.op_jump(log_seq, new_log_jump_to);
+ ::encode(t2, bl);
+ _pad_bl(bl);
+ dout(10) << __func__ << " new_log_jump_to 0x" << std::hex << new_log_jump_to
+ << std::dec << dendl;
+
+ // create a new log [writer]
+ new_log = new File;
+ new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode
+ int r = _allocate(BlueFS::BDEV_DB, new_log_jump_to,
+ &new_log->fnode.extents);
+ assert(r == 0);
+ new_log_writer = _create_writer(new_log);
+ new_log_writer->append(bl);
+
+ // 3. flush
+ _flush(new_log_writer, true);
+ lock.unlock();
+
+ // 4. wait
+ dout(10) << __func__ << " waiting for compacted log to sync" << dendl;
+ wait_for_aio(new_log_writer);
+ flush_bdev();
+
+ // 5. retake lock
+ lock.lock();
+
+ // 6. update our log fnode
+ // discard first old_log_jump_to extents
+ dout(10) << __func__ << " remove 0x" << std::hex << old_log_jump_to << std::dec
+ << " of " << log_file->fnode.extents << dendl;
+ uint64_t discarded = 0;
+ vector<bluefs_extent_t> old_extents;
+ while (discarded < old_log_jump_to) {
+ bluefs_extent_t& e = log_file->fnode.extents.front();
+ bluefs_extent_t temp = e;
+ if (discarded + e.length <= old_log_jump_to) {
+ dout(10) << __func__ << " remove old log extent " << e << dendl;
+ discarded += e.length;
+ log_file->fnode.extents.erase(log_file->fnode.extents.begin());
+ } else {
+ dout(10) << __func__ << " remove front of old log extent " << e << dendl;
+ uint64_t drop = old_log_jump_to - discarded;
+ temp.length = drop;
+ e.offset += drop;
+ e.length -= drop;
+ discarded += drop;
+ dout(10) << __func__ << " kept " << e << " removed " << temp << dendl;
+ }
+ old_extents.push_back(temp);
+ }
+ new_log->fnode.extents.insert(new_log->fnode.extents.end(),
+ log_file->fnode.extents.begin(),
+ log_file->fnode.extents.end());
+
+ // clear the extents from old log file, they are added to new log
+ log_file->fnode.extents.clear();
+
+ // swap the log files. New log file is the log file now.
+ log_file->fnode.extents.swap(new_log->fnode.extents);
+ log_writer->pos = log_writer->file->fnode.size =
+ log_writer->pos - old_log_jump_to + new_log_jump_to;
+
+ // 7. write the super block to reflect the changes
+ dout(10) << __func__ << " writing super" << dendl;
+ super.log_fnode = log_file->fnode;
+ ++super.version;
+ _write_super();
+
+ lock.unlock();
+ flush_bdev();
+ lock.lock();
+
+ // 8. release old space
+ dout(10) << __func__ << " release old log extents " << old_extents << dendl;
+ for (auto& r : old_extents) {
+ alloc[r.bdev]->release(r.offset, r.length);
+ }
+
+ // delete the new log, remove from the dirty files list
+ _close_writer(new_log_writer);
+ dirty_files.erase(dirty_files.iterator_to(*new_log));
+ new_log_writer = nullptr;
+ new_log = nullptr;
+ log_cond.notify_all();
+
+ dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
+ logger->inc(l_bluefs_log_compactions);
+}
+
void BlueFS::_pad_bl(bufferlist& bl)
{
uint64_t partial = bl.length() % super.block_size;
if (runway < g_conf->bluefs_min_log_runway) {
dout(10) << __func__ << " allocating more log runway (0x"
<< std::hex << runway << std::dec << " remaining)" << dendl;
+ while (new_log_writer) {
+ dout(10) << __func__ << " waiting for async compaction" << dendl;
+ log_cond.wait(l);
+ }
int r = _allocate(log_writer->file->fnode.prefer_bdev,
g_conf->bluefs_max_log_runway,
&log_writer->file->fnode.extents);
File *file = &(*p);
assert(file->dirty_seq > 0);
if (file->dirty_seq <= log_seq_stable) {
- dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
- file->dirty_seq = 0;
- dirty_files.erase(p++);
+ dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
+ file->dirty_seq = 0;
+ dirty_files.erase(p++);
} else {
- ++p;
+ ++p;
}
}
} else {
dout(20) << __func__ << " log_seq_stable " << log_seq_stable
- << " already > out seq " << seq
- << ", we lost a race against another log flush, done" << dendl;
+ << " already > out seq " << seq
+ << ", we lost a race against another log flush, done" << dendl;
}
-
_update_logger_stats();
return 0;
int r = _allocate(h->file->fnode.prefer_bdev,
offset + length - allocated,
&h->file->fnode.extents);
- if (r < 0)
+ if (r < 0) {
+ derr << __func__ << " allocated: " << allocated << \
+ " offset: " << offset << " length: " << length << dendl;
return r;
+ }
must_dirty = true;
}
if (h->file->fnode.size < offset + length) {
p->commit_finish();
}
}
+
if (_should_compact_log()) {
- _compact_log_sync();
+ if (g_conf->bluefs_compact_log_sync) {
+ _compact_log_sync();
+ } else {
+ _compact_log_async(l);
+ }
}
+
utime_t end = ceph_clock_now(NULL);
utime_t dur = end - start;
dout(10) << __func__ << " done in " << dur << dendl;