From: Sage Weil Date: Fri, 22 Oct 2021 02:18:12 +0000 (-0400) Subject: Revert "Merge pull request #42099 from aclamk/wip-bluefs-fine-grain-locking-2" X-Git-Tag: v17.1.0~598^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e4306d84ad281048d7e44efe051cb5b8c5a8e602;p=ceph.git Revert "Merge pull request #42099 from aclamk/wip-bluefs-fine-grain-locking-2" This reverts commit 44e89c613c6b882f3436714fdaa6c9c04b94b62d, reversing changes made to 552ac6657753f00f039f267e20e5a4c399c2bb10. This is causing failures in ObjectStore/StoreTestSpecificAUSize.SyntheticMatrixCsumVsCompression/2 and ObjectStore/StoreTestSpecificAUSize.SpilloverFixedTest/2 Signed-off-by: Sage Weil --- diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index 7add76833705f..f34996be3e000 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -148,9 +148,9 @@ private: out.append(ss); } else if (command == "bluefs files list") { const char* devnames[3] = {"wal","db","slow"}; - std::lock_guard l(bluefs->nodes.lock); + std::lock_guard l(bluefs->lock); f->open_array_section("files"); - for (auto &d : bluefs->nodes.dir_map) { + for (auto &d : bluefs->dir_map) { std::string dir = d.first; for (auto &r : d.second->file_map) { f->open_object_section("file"); @@ -310,12 +310,11 @@ void BlueFS::_shutdown_logger() delete logger; } -//AK - TODO - locking needed but not certain void BlueFS::_update_logger_stats() { // we must be holding the lock - logger->set(l_bluefs_num_files, nodes.file_map.size()); - logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size); + logger->set(l_bluefs_num_files, file_map.size()); + logger->set(l_bluefs_log_bytes, log_writer->file->fnode.size); if (alloc[BDEV_WAL]) { logger->set(l_bluefs_wal_total_bytes, _get_total(BDEV_WAL)); @@ -393,6 +392,7 @@ void BlueFS::handle_discard(unsigned id, interval_set& to_release) uint64_t BlueFS::get_used() { + std::lock_guard l(lock); uint64_t used = 0; for (unsigned id = 0; id < MAX_BDEV; ++id) { used += _get_used(id); @@ -418,6 +418,7 @@ uint64_t BlueFS::get_used(unsigned id) { ceph_assert(id < alloc.size()); ceph_assert(alloc[id]); + std::lock_guard l(lock); return _get_used(id); } @@ -430,11 +431,13 @@ uint64_t BlueFS::_get_total(unsigned id) const uint64_t BlueFS::get_total(unsigned id) { + std::lock_guard l(lock); return _get_total(id); } uint64_t BlueFS::get_free(unsigned id) { + std::lock_guard l(lock); ceph_assert(id < alloc.size()); return alloc[id]->get_free(); } @@ -464,10 +467,10 @@ void BlueFS::dump_block_extents(ostream& out) int BlueFS::get_block_extents(unsigned id, interval_set *extents) { - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " bdev " << id << dendl; ceph_assert(id < alloc.size()); - for (auto& p : nodes.file_map) { + for (auto& p : file_map) { for (auto& q : p.second->fnode.extents) { if (q.bdev == id) { extents->insert(q.offset, q.length); @@ -479,6 +482,7 @@ int BlueFS::get_block_extents(unsigned id, interval_set *extents) int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) { + std::unique_lock l(lock); dout(1) << __func__ << " osd_uuid " << osd_uuid << dendl; @@ -511,24 +515,22 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) &log_file->fnode); vselector->add_usage(log_file->vselector_hint, log_file->fnode); ceph_assert(r == 0); - log.writer = _create_writer(log_file); + log_writer = _create_writer(log_file); // initial txn - ceph_assert(log.seq_live == 1); - log.t.seq = 1; - log.t.op_init(); - _flush_and_sync_log_LD(); + log_t.op_init(); + _flush_and_sync_log(l); // write supers super.log_fnode = log_file->fnode; super.memorized_layout = layout; _write_super(BDEV_DB); - _flush_bdev(); + flush_bdev(); // clean up super = bluefs_super_t(); - _close_writer(log.writer); - log.writer = NULL; + _close_writer(log_writer); + log_writer = NULL; vselector.reset(nullptr); _stop_alloc(); _shutdown_logger(); @@ -783,7 +785,7 @@ int BlueFS::mount() } // init freelist - for (auto& p : nodes.file_map) { + for (auto& p : file_map) { dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl; for (auto& q : p.second->fnode.extents) { bool is_shared = is_shared_alloc(q.bdev); @@ -806,11 +808,11 @@ int BlueFS::mount() } // set up the log for future writes - log.writer = _create_writer(_get_file(1)); - ceph_assert(log.writer->file->fnode.ino == 1); - log.writer->pos = log.writer->file->fnode.size; + log_writer = _create_writer(_get_file(1)); + ceph_assert(log_writer->file->fnode.ino == 1); + log_writer->pos = log_writer->file->fnode.size; dout(10) << __func__ << " log write pos set to 0x" - << std::hex << log.writer->pos << std::dec + << std::hex << log_writer->pos << std::dec << dendl; return 0; @@ -843,15 +845,15 @@ void BlueFS::umount(bool avoid_compact) sync_metadata(avoid_compact); - _close_writer(log.writer); - log.writer = NULL; + _close_writer(log_writer); + log_writer = NULL; vselector.reset(nullptr); _stop_alloc(); - nodes.file_map.clear(); - nodes.dir_map.clear(); + file_map.clear(); + dir_map.clear(); super = bluefs_super_t(); - log.t.clear(); + log_t.clear(); _shutdown_logger(); } @@ -866,7 +868,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout) new_log_dev_cur = BDEV_NEWDB; new_log_dev_next = BDEV_DB; } - _rewrite_log_and_layout_sync_LN_LD(false, + _rewrite_log_and_layout_sync(false, BDEV_NEWDB, new_log_dev_cur, new_log_dev_next, @@ -874,7 +876,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout) layout); //} } else if(id == BDEV_NEWWAL) { - _rewrite_log_and_layout_sync_LN_LD(false, + _rewrite_log_and_layout_sync(false, BDEV_DB, BDEV_NEWWAL, BDEV_WAL, @@ -905,6 +907,7 @@ void BlueFS::get_devices(set *ls) int BlueFS::fsck() { + std::lock_guard l(lock); dout(1) << __func__ << dendl; // hrm, i think we check everything on mount... return 0; @@ -1035,7 +1038,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) { dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl; ino_last = 1; // by the log - uint64_t log_seq = 0; + log_seq = 0; FileRef log_file; log_file = _get_file(1); @@ -1089,7 +1092,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) int r = _read(log_reader, read_pos, super.block_size, &bl, NULL); if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) { - r += _do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl); + r += do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl); } assert(r == (int)super.block_size); read_pos += r; @@ -1149,7 +1152,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) << ", which is past eof" << dendl; if (cct->_conf->bluefs_replay_recovery) { //try to search for more data - r += _do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t); + r += do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t); if (r < (int)more) { //in normal mode we must read r==more, for recovery it is too strict break; @@ -1223,7 +1226,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) << std::endl; } - ceph_assert(next_seq > log_seq); + ceph_assert(next_seq >= log_seq); log_seq = next_seq - 1; // we will increment it below uint64_t skip = offset - read_pos; if (skip) { @@ -1251,7 +1254,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) << ": op_jump_seq " << next_seq << std::endl; } - ceph_assert(next_seq > log_seq); + ceph_assert(next_seq >= log_seq); log_seq = next_seq - 1; // we will increment it below } break; @@ -1299,8 +1302,8 @@ int BlueFS::_replay(bool noop, bool to_stdout) if (!noop) { FileRef file = _get_file(ino); ceph_assert(file->fnode.ino); - map::iterator q = nodes.dir_map.find(dirname); - ceph_assert(q != nodes.dir_map.end()); + map::iterator q = dir_map.find(dirname); + ceph_assert(q != dir_map.end()); map::iterator r = q->second->file_map.find(filename); ceph_assert(r == q->second->file_map.end()); @@ -1330,8 +1333,8 @@ int BlueFS::_replay(bool noop, bool to_stdout) } if (!noop) { - map::iterator q = nodes.dir_map.find(dirname); - ceph_assert(q != nodes.dir_map.end()); + map::iterator q = dir_map.find(dirname); + ceph_assert(q != dir_map.end()); map::iterator r = q->second->file_map.find(filename); ceph_assert(r != q->second->file_map.end()); ceph_assert(r->second->refs > 0); @@ -1353,9 +1356,9 @@ int BlueFS::_replay(bool noop, bool to_stdout) } if (!noop) { - map::iterator q = nodes.dir_map.find(dirname); - ceph_assert(q == nodes.dir_map.end()); - nodes.dir_map[dirname] = ceph::make_ref(); + map::iterator q = dir_map.find(dirname); + ceph_assert(q == dir_map.end()); + dir_map[dirname] = ceph::make_ref(); } } break; @@ -1372,10 +1375,10 @@ int BlueFS::_replay(bool noop, bool to_stdout) } if (!noop) { - map::iterator q = nodes.dir_map.find(dirname); - ceph_assert(q != nodes.dir_map.end()); + map::iterator q = dir_map.find(dirname); + ceph_assert(q != dir_map.end()); ceph_assert(q->second->file_map.empty()); - nodes.dir_map.erase(q); + dir_map.erase(q); } } break; @@ -1436,8 +1439,8 @@ int BlueFS::_replay(bool noop, bool to_stdout) } if (!noop) { - auto p = nodes.file_map.find(ino); - ceph_assert(p != nodes.file_map.end()); + auto p = file_map.find(ino); + ceph_assert(p != file_map.end()); vselector->sub_usage(p->second->vselector_hint, p->second->fnode); if (cct->_conf->bluefs_log_replay_check_allocations) { int r = _check_allocations(p->second->fnode, @@ -1446,7 +1449,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) return r; } } - nodes.file_map.erase(p); + file_map.erase(p); } } break; @@ -1466,11 +1469,6 @@ int BlueFS::_replay(bool noop, bool to_stdout) } if (!noop) { vselector->add_usage(log_file->vselector_hint, log_file->fnode); - log.seq_live = log_seq + 1; - dirty.seq_live = log_seq + 1; - log.t.seq = log.seq_live; - //log.seq_stable = log_seq; - dirty.seq_stable = log_seq; } dout(10) << __func__ << " log file size was 0x" @@ -1484,7 +1482,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) if (!noop) { // verify file link counts are all >0 - for (auto& p : nodes.file_map) { + for (auto& p : file_map) { if (p.second->refs == 0 && p.second->fnode.ino > 1) { derr << __func__ << " file with link count 0: " << p.second->fnode @@ -1501,7 +1499,7 @@ int BlueFS::_replay(bool noop, bool to_stdout) int BlueFS::log_dump() { // only dump log file's content - ceph_assert(log.writer == nullptr && "cannot log_dump on mounted BlueFS"); + ceph_assert(log_writer == nullptr && "cannot log_dump on mounted BlueFS"); int r = _open_super(); if (r < 0) { derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl; @@ -1543,7 +1541,7 @@ int BlueFS::device_migrate_to_existing( dout(0) << __func__ << " super to be written to " << dev_target << dendl; } - for (auto& [ino, file_ref] : nodes.file_map) { + for (auto& [ino, file_ref] : file_map) { //do not copy log if (file_ref->fnode.ino == 1) { continue; @@ -1650,7 +1648,7 @@ int BlueFS::device_migrate_to_existing( new_log_dev_next; } - _rewrite_log_and_layout_sync_LN_LD( + _rewrite_log_and_layout_sync( false, (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB, new_log_dev_cur, @@ -1681,7 +1679,7 @@ int BlueFS::device_migrate_to_new( flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0; int dev_target_new = dev_target; //FIXME: remove, makes no sense - for (auto& p : nodes.file_map) { + for (auto& p : file_map) { //do not copy log if (p.second->fnode.ino == 1) { continue; @@ -1785,7 +1783,7 @@ int BlueFS::device_migrate_to_new( BDEV_DB : BDEV_SLOW; - _rewrite_log_and_layout_sync_LN_LD( + _rewrite_log_and_layout_sync( false, super_dev, new_log_dev_cur, @@ -1797,10 +1795,10 @@ int BlueFS::device_migrate_to_new( BlueFS::FileRef BlueFS::_get_file(uint64_t ino) { - auto p = nodes.file_map.find(ino); - if (p == nodes.file_map.end()) { + auto p = file_map.find(ino); + if (p == file_map.end()) { FileRef f = ceph::make_ref(); - nodes.file_map[ino] = f; + file_map[ino] = f; dout(30) << __func__ << " ino " << ino << " = " << f << " (new)" << dendl; return f; @@ -1810,33 +1808,29 @@ BlueFS::FileRef BlueFS::_get_file(uint64_t ino) } } -void BlueFS::_drop_link_D(FileRef file) +void BlueFS::_drop_link(FileRef file) { dout(20) << __func__ << " had refs " << file->refs << " on " << file->fnode << dendl; ceph_assert(file->refs > 0); - ceph_assert(ceph_mutex_is_locked(log.lock)); - --file->refs; if (file->refs == 0) { dout(20) << __func__ << " destroying " << file->fnode << dendl; ceph_assert(file->num_reading.load() == 0); vselector->sub_usage(file->vselector_hint, file->fnode); - log.t.op_file_remove(file->fnode.ino); - - std::lock_guard dl(dirty.lock); + log_t.op_file_remove(file->fnode.ino); for (auto& r : file->fnode.extents) { pending_release[r.bdev].insert(r.offset, r.length); } - nodes.file_map.erase(file->fnode.ino); + file_map.erase(file->fnode.ino); file->deleted = true; - if (file->dirty_seq > dirty.seq_stable) { - // retract request to serialize changes - ceph_assert(dirty.files.count(file->dirty_seq)); - auto it = dirty.files[file->dirty_seq].iterator_to(*file); - dirty.files[file->dirty_seq].erase(it); - file->dirty_seq = dirty.seq_stable; + if (file->dirty_seq) { + ceph_assert(file->dirty_seq > log_seq_stable); + ceph_assert(dirty_files.count(file->dirty_seq)); + auto it = dirty_files[file->dirty_seq].iterator_to(*file); + dirty_files[file->dirty_seq].erase(it); + file->dirty_seq = 0; } } } @@ -2055,9 +2049,8 @@ int64_t BlueFS::_read( return ret; } -void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length) +void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length) { - std::lock_guard l(f->lock); dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << offset << "~" << length << std::dec << dendl; @@ -2077,68 +2070,60 @@ void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length) } } -uint64_t BlueFS::_estimate_log_size_N() +uint64_t BlueFS::_estimate_log_size() { - std::lock_guard nl(nodes.lock); int avg_dir_size = 40; // fixme int avg_file_size = 12; uint64_t size = 4096 * 2; - size += nodes.file_map.size() * (1 + sizeof(bluefs_fnode_t)); - size += nodes.dir_map.size() + (1 + avg_dir_size); - size += nodes.file_map.size() * (1 + avg_dir_size + avg_file_size); + size += file_map.size() * (1 + sizeof(bluefs_fnode_t)); + size += dir_map.size() + (1 + avg_dir_size); + size += file_map.size() * (1 + avg_dir_size + avg_file_size); return round_up_to(size, super.block_size); } void BlueFS::compact_log() { + std::unique_lock l(lock); if (!cct->_conf->bluefs_replay_recovery_disable_compact) { if (cct->_conf->bluefs_compact_log_sync) { - _compact_log_sync_LN_LD(); + _compact_log_sync(); } else { - _compact_log_async_LD_NF_D(); + _compact_log_async(l); } } } -bool BlueFS::_should_start_compact_log_L_N() +bool BlueFS::_should_compact_log() { - if (log_is_compacting.load() == true) { - // compaction is already running - return false; - } - uint64_t current; - { - std::lock_guard ll(log.lock); - current = log.writer->file->fnode.size; - } - uint64_t expected = _estimate_log_size_N(); + uint64_t current = log_writer->file->fnode.size; + uint64_t expected = _estimate_log_size(); float ratio = (float)current / (float)expected; dout(10) << __func__ << " current 0x" << std::hex << current << " expected " << expected << std::dec << " ratio " << ratio + << (new_log ? " (async compaction in progress)" : "") << dendl; - if (current < cct->_conf->bluefs_log_compact_min_size || + if (new_log || + current < cct->_conf->bluefs_log_compact_min_size || ratio < cct->_conf->bluefs_log_compact_min_ratio) { return false; } return true; } -void BlueFS::_compact_log_dump_metadata_N(bluefs_transaction_t *t, +void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t, int flags) { - std::lock_guard nl(nodes.lock); - t->seq = 1; t->uuid = super.uuid; dout(20) << __func__ << " op_init" << dendl; t->op_init(); - for (auto& [ino, file_ref] : nodes.file_map) { + for (auto& [ino, file_ref] : file_map) { if (ino == 1) continue; ceph_assert(ino > 1); - //AK - TODO - touching fnode - need to lock + for(auto& e : file_ref->fnode.extents) { auto bdev = e.bdev; auto bdev_new = bdev; @@ -2164,41 +2149,7 @@ void BlueFS::_compact_log_dump_metadata_N(bluefs_transaction_t *t, dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl; t->op_file_update(file_ref->fnode); } - for (auto& [path, dir_ref] : nodes.dir_map) { - dout(20) << __func__ << " op_dir_create " << path << dendl; - t->op_dir_create(path); - for (auto& [fname, file_ref] : dir_ref->file_map) { - dout(20) << __func__ << " op_dir_link " << path << "/" << fname - << " to " << file_ref->fnode.ino << dendl; - t->op_dir_link(path, fname, file_ref->fnode.ino); - } - } -} -/* Streams to t files modified before *capture_before_seq* and all dirs */ -void BlueFS::compact_log_async_dump_metadata_NF(bluefs_transaction_t *t, - uint64_t capture_before_seq) -{ - std::lock_guard nl(nodes.lock); - - t->seq = 1; - t->uuid = super.uuid; - dout(20) << __func__ << " op_init" << dendl; - - t->op_init(); - for (auto& [ino, file_ref] : nodes.file_map) { - if (ino == 1) - continue; - ceph_assert(ino > 1); - std::lock_guard fl(file_ref->lock); - if (file_ref->dirty_seq < capture_before_seq) { - dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl; - t->op_file_update(file_ref->fnode); - } else { - dout(20) << __func__ << " skipping just modified, dirty_seq=" - << file_ref->dirty_seq << " " << file_ref->fnode << dendl; - } - } - for (auto& [path, dir_ref] : nodes.dir_map) { + for (auto& [path, dir_ref] : dir_map) { dout(20) << __func__ << " op_dir_create " << path << dendl; t->op_dir_create(path); for (auto& [fname, file_ref] : dir_ref->file_map) { @@ -2209,12 +2160,12 @@ void BlueFS::compact_log_async_dump_metadata_NF(bluefs_transaction_t *t, } } -void BlueFS::_compact_log_sync_LN_LD() +void BlueFS::_compact_log_sync() { dout(10) << __func__ << dendl; auto prefer_bdev = - vselector->select_prefer_bdev(log.writer->file->vselector_hint); - _rewrite_log_and_layout_sync_LN_LD(true, + vselector->select_prefer_bdev(log_writer->file->vselector_hint); + _rewrite_log_and_layout_sync(true, BDEV_DB, prefer_bdev, prefer_bdev, @@ -2223,24 +2174,17 @@ void BlueFS::_compact_log_sync_LN_LD() logger->inc(l_bluefs_log_compactions); } -void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, - int super_dev, - int log_dev, - int log_dev_new, - int flags, - std::optional layout) +void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback, + int super_dev, + int log_dev, + int log_dev_new, + int flags, + std::optional layout) { - std::lock_guard ll(log.lock); + File *log_file = log_writer->file.get(); - File *log_file = log.writer->file.get(); - - // log.t.seq is always set to current live seq - ceph_assert(log.t.seq == log.seq_live); - // Capturing entire state. Dump anything that has been stored there. - log.t.clear(); - log.t.seq = log.seq_live; - // From now on, no changes to log.t are permitted until we finish rewriting log. - // Can allow dirty to remain dirty - log.seq_live will not change. + // clear out log (be careful who calls us!!!) + log_t.clear(); dout(20) << __func__ << " super_dev:" << super_dev << " log_dev:" << log_dev @@ -2248,10 +2192,10 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, << " flags:" << flags << dendl; bluefs_transaction_t t; - _compact_log_dump_metadata_N(&t, flags); + _compact_log_dump_metadata(&t, flags); - dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl; - t.op_jump_seq(log.seq_live); + dout(20) << __func__ << " op_jump_seq " << log_seq << dendl; + t.op_jump_seq(log_seq); bufferlist bl; encode(t, bl); @@ -2278,25 +2222,25 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, } } - _close_writer(log.writer); + _close_writer(log_writer); log_file->fnode.size = bl.length(); vselector->sub_usage(log_file->vselector_hint, old_fnode); vselector->add_usage(log_file->vselector_hint, log_file->fnode); - log.writer = _create_writer(log_file); - log.writer->append(bl); - r = _flush_special(log.writer); + log_writer = _create_writer(log_file); + log_writer->append(bl); + r = _flush(log_writer, true); ceph_assert(r == 0); #ifdef HAVE_LIBAIO if (!cct->_conf->bluefs_sync_write) { list completed_ios; - _claim_completed_aios(log.writer, &completed_ios); - _wait_for_aio(log.writer); + _claim_completed_aios(log_writer, &completed_ios); + wait_for_aio(log_writer); completed_ios.clear(); } #endif - _flush_bdev(); + flush_bdev(); super.memorized_layout = layout; super.log_fnode = log_file->fnode; @@ -2311,10 +2255,9 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, ++super.version; _write_super(super_dev); - _flush_bdev(); + flush_bdev(); dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl; - std::lock_guard dl(dirty.lock); for (auto& r : old_fnode.extents) { pending_release[r.bdev].insert(r.offset, r.length); } @@ -2342,44 +2285,30 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, * * 8. Release the old log space. Clean up. */ - -void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer +void BlueFS::_compact_log_async(std::unique_lock& l) { dout(10) << __func__ << dendl; - // only one compaction allowed at one time - bool old_is_comp = std::atomic_exchange(&log_is_compacting, true); - if (old_is_comp) { - dout(10) << __func__ << " ongoing" <file.get(); - FileWriter *new_log_writer = nullptr; - FileRef new_log = nullptr; - uint64_t new_log_jump_to = 0; - uint64_t old_log_jump_to = 0; + File *log_file = log_writer->file.get(); + ceph_assert(!new_log); + ceph_assert(!new_log_writer); + // create a new log [writer] so that we know compaction is in progress + // (see _should_compact_log) new_log = ceph::make_ref(); - new_log->fnode.ino = 0; // we use _flush_special to avoid log of the fnode - - // Part 1. - // Prepare current log for jumping into it. - // 1. Allocate extent - // 2. Update op to log - // 3. Jump op to log - // During that, no one else can write to log, otherwise we risk jumping backwards. - // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that. + new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode - //signal _maybe_extend_log that expansion of log is temporary inacceptable - bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true); - ceph_assert(old_forbidden == false); + // 0. wait for any racing flushes to complete. (We do not want to block + // in _flush_sync_log with jump_to set or else a racing thread might flush + // our entries and our jump_to update won't be correct.) + while (log_flushing) { + dout(10) << __func__ << " log is currently flushing, waiting" << dendl; + log_cond.wait(l); + } vselector->sub_usage(log_file->vselector_hint, log_file->fnode); - // 1.1 allocate new log space and jump to it. + // 1. allocate new log space and jump to it. old_log_jump_to = log_file->fnode.get_allocated(); - uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos(); dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to << " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl; int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint), @@ -2392,22 +2321,19 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer // update the log file change and log a jump to the offset where we want to // write the new entries - log.t.op_file_update(log_file->fnode); - // jump to new position should mean next seq - log.t.op_jump(log.seq_live + 1, old_log_jump_to); - uint64_t seq_now = log.seq_live; - // we need to flush all bdev because we will be streaming all dirty files to log - // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations - // then flush_bdev() will not be necessary - _flush_bdev(); - _flush_and_sync_log_jump_D(old_log_jump_to, runway); - - log.lock.unlock(); - // out of jump section - now log can be used to write to + log_t.op_file_update(log_file->fnode); + log_t.op_jump(log_seq, old_log_jump_to); + + flush_bdev(); // FIXME? + + _flush_and_sync_log(l, 0, old_log_jump_to); // 2. prepare compacted log bluefs_transaction_t t; - compact_log_async_dump_metadata_NF(&t, seq_now); + //avoid record two times in log_t and _compact_log_dump_metadata. + log_t.clear(); + _compact_log_dump_metadata(&t, 0); + uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL], std::max(alloc_size[BDEV_DB], alloc_size[BDEV_SLOW])); @@ -2415,8 +2341,7 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer // conservative estimate for final encoded size new_log_jump_to = round_up_to(t.op_bl.length() + super.block_size * 2, max_alloc_size); - //newly constructed log head will jump to what we had before - t.op_jump(seq_now, new_log_jump_to); + t.op_jump(log_seq, new_log_jump_to); // allocate //FIXME: check if we want DB here? @@ -2425,7 +2350,7 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer ceph_assert(r == 0); // we might have some more ops in log_t due to _allocate call - // t.claim_ops(log_t); no we no longer track allocations in log + t.claim_ops(log_t); bufferlist bl; encode(t, bl); @@ -2435,18 +2360,15 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer << std::dec << dendl; new_log_writer = _create_writer(new_log); - new_log_writer->append(bl); - new_log->lock.lock(); - new_log_writer->lock.lock(); + // 3. flush - r = _flush_special(new_log_writer); + r = _flush(new_log_writer, true); ceph_assert(r == 0); // 4. wait - _flush_bdev(new_log_writer); - new_log_writer->lock.unlock(); - new_log->lock.unlock(); + _flush_bdev_safely(new_log_writer); + // 5. update our log fnode // discard first old_log_jump_to extents @@ -2487,8 +2409,8 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer // swap the log files. New log file is the log file now. new_log->fnode.swap_extents(log_file->fnode); - log.writer->pos = log.writer->file->fnode.size = - log.writer->pos - old_log_jump_to + new_log_jump_to; + log_writer->pos = log_writer->file->fnode.size = + log_writer->pos - old_log_jump_to + new_log_jump_to; vselector->add_usage(log_file->vselector_hint, log_file->fnode); @@ -2498,33 +2420,29 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer ++super.version; _write_super(BDEV_DB); - _flush_bdev(); - - old_forbidden = atomic_exchange(&log_forbidden_to_expand, false); - ceph_assert(old_forbidden == true); - //to wake up if someone was in need of expanding log - log_cond.notify_all(); + lock.unlock(); + flush_bdev(); + lock.lock(); // 7. release old space dout(10) << __func__ << " release old log extents " << old_extents << dendl; - { - std::lock_guard dl(dirty.lock); - for (auto& r : old_extents) { - pending_release[r.bdev].insert(r.offset, r.length); - } + for (auto& r : old_extents) { + pending_release[r.bdev].insert(r.offset, r.length); } // delete the new log, remove from the dirty files list _close_writer(new_log_writer); + if (new_log->dirty_seq) { + ceph_assert(dirty_files.count(new_log->dirty_seq)); + auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log); + dirty_files[new_log->dirty_seq].erase(it); + } new_log_writer = nullptr; new_log = nullptr; log_cond.notify_all(); dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; logger->inc(l_bluefs_log_compactions); - - old_is_comp = atomic_exchange(&log_is_compacting, false); - ceph_assert(old_is_comp); } void BlueFS::_pad_bl(bufferlist& bl) @@ -2537,108 +2455,76 @@ void BlueFS::_pad_bl(bufferlist& bl) } } -/** - * Adds file modifications from `dirty.files` to bluefs transactions - * already stored in `log.t`. Writes them to disk and waits until are stable. - * Guarantees that file modifications with `want_seq` are already stable on disk. - * In addition may insert jump forward transaction to log write position `jump_to`. - * - * it should lock ability to: - * 1) add to log.t - * 2) modify dirty.files - * 3) add to pending_release - * - * pending_release and log.t go with same lock - */ -// Returns log seq that was live before advance. -uint64_t BlueFS::_log_advance_seq() +int BlueFS::_flush_and_sync_log(std::unique_lock& l, + uint64_t want_seq, + uint64_t jump_to) { - ceph_assert(ceph_mutex_is_locked(dirty.lock)); - ceph_assert(ceph_mutex_is_locked(log.lock)); - //acquire new seq - // this will became seq_stable once we write - ceph_assert(dirty.seq_stable < dirty.seq_live); - ceph_assert(log.t.seq == log.seq_live); - uint64_t seq = log.seq_live; - log.t.uuid = super.uuid; - - ++dirty.seq_live; - ++log.seq_live; - ceph_assert(dirty.seq_live == log.seq_live); - return seq; -} + while (log_flushing) { + dout(10) << __func__ << " want_seq " << want_seq + << " log is currently flushing, waiting" << dendl; + ceph_assert(!jump_to); + log_cond.wait(l); + } + if (want_seq && want_seq <= log_seq_stable) { + dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable " + << log_seq_stable << ", done" << dendl; + ceph_assert(!jump_to); + return 0; + } + if (log_t.empty() && dirty_files.empty()) { + dout(10) << __func__ << " want_seq " << want_seq + << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl; + ceph_assert(!jump_to); + return 0; + } + vector> to_release(pending_release.size()); + to_release.swap(pending_release); -// Adds to log.t file modifications mentioned in `dirty.files`. -// Note: some bluefs ops may have already been stored in log.t transaction. -void BlueFS::_consume_dirty(uint64_t seq) -{ - ceph_assert(ceph_mutex_is_locked(dirty.lock)); - ceph_assert(ceph_mutex_is_locked(log.lock)); + uint64_t seq = log_t.seq = ++log_seq; + ceph_assert(want_seq == 0 || want_seq <= seq); + log_t.uuid = super.uuid; // log dirty files - // we just incremented log_seq. It is now illegal to add to dirty.files[log_seq] - auto lsi = dirty.files.find(seq); - if (lsi != dirty.files.end()) { - dout(20) << __func__ << " " << lsi->second.size() << " dirty.files" << dendl; + auto lsi = dirty_files.find(seq); + if (lsi != dirty_files.end()) { + dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl; for (auto &f : lsi->second) { dout(20) << __func__ << " op_file_update " << f.fnode << dendl; - log.t.op_file_update(f.fnode); + log_t.op_file_update(f.fnode); } } -} -// Extends log if its free space is smaller then bluefs_min_log_runway. -// Returns space available *BEFORE* adding new space. Signed for additional <0 detection. -int64_t BlueFS::_maybe_extend_log() -{ - ceph_assert(ceph_mutex_is_locked(log.lock)); + dout(10) << __func__ << " " << log_t << dendl; + ceph_assert(!log_t.empty()); + // allocate some more space (before we run out)? - // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`. - int64_t runway = log.writer->file->fnode.get_allocated() - - log.writer->get_effective_write_pos(); + // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`. + int64_t runway = log_writer->file->fnode.get_allocated() - + log_writer->get_effective_write_pos(); + bool just_expanded_log = false; if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) { dout(10) << __func__ << " allocating more log runway (0x" << std::hex << runway << std::dec << " remaining)" << dendl; - /* - * Usually, when we are low on space in log, we just allocate new extent, - * put update op(log) to log and we are fine. - * Problem - it interferes with log compaction: - * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log. - * It is assumed that log region (anchor - end) will contain all changes made by bluefs since - * full state capture into new log. - * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with - * both logs, but old log is different then new log. - * - * Possible solutions: - * - stall extending log until we finish compacting and switch log (CURRENT) - * - re-run compaction with more runway for old log - * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs - */ - if (log_forbidden_to_expand.load() == true) { - return -EWOULDBLOCK; + while (new_log_writer) { + dout(10) << __func__ << " waiting for async compaction" << dendl; + log_cond.wait(l); } - vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode); + vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode); int r = _allocate( - vselector->select_prefer_bdev(log.writer->file->vselector_hint), + vselector->select_prefer_bdev(log_writer->file->vselector_hint), cct->_conf->bluefs_max_log_runway, - &log.writer->file->fnode); + &log_writer->file->fnode); ceph_assert(r == 0); - vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode); - log.t.op_file_update(log.writer->file->fnode); + vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode); + log_t.op_file_update(log_writer->file->fnode); + just_expanded_log = true; } - return runway; -} - -void BlueFS::_flush_and_sync_log_core(int64_t runway) -{ - ceph_assert(ceph_mutex_is_locked(log.lock)); - dout(10) << __func__ << " " << log.t << dendl; bufferlist bl; bl.reserve(super.block_size); - encode(log.t, bl); + encode(log_t, bl); // pad to block boundary size_t realign = super.block_size - (bl.length() % super.block_size); if (realign && realign != super.block_size) @@ -2646,35 +2532,41 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway) logger->inc(l_bluefs_logged_bytes, bl.length()); - if (true) { + if (just_expanded_log) { ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss - // transaction will not fit extents before growth -> data loss on _replay } - log.writer->append(bl); + log_writer->append(bl); - // prepare log for new transactions - log.t.clear(); - log.t.seq = log.seq_live; + log_t.clear(); + log_t.seq = 0; // just so debug output is less confusing + log_flushing = true; - int r = _flush_special(log.writer); + int r = _flush(log_writer, true); ceph_assert(r == 0); -} -// Clears dirty.files up to (including) seq_stable. -void BlueFS::_clear_dirty_set_stable_D(uint64_t seq) -{ - std::lock_guard dl(dirty.lock); + if (jump_to) { + dout(10) << __func__ << " jumping log offset from 0x" << std::hex + << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl; + log_writer->pos = jump_to; + vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size); + log_writer->file->fnode.size = jump_to; + vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size); + } + + _flush_bdev_safely(log_writer); + + log_flushing = false; + log_cond.notify_all(); // clean dirty files - if (seq > dirty.seq_stable) { - dirty.seq_stable = seq; - dout(20) << __func__ << " seq_stable " << dirty.seq_stable << dendl; - - // undirty all files that were already streamed to log - auto p = dirty.files.begin(); - while (p != dirty.files.end()) { - if (p->first > dirty.seq_stable) { + if (seq > log_seq_stable) { + log_seq_stable = seq; + dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl; + + auto p = dirty_files.begin(); + while (p != dirty_files.end()) { + if (p->first > log_seq_stable) { dout(20) << __func__ << " done cleaning up dirty files" << dendl; break; } @@ -2682,24 +2574,22 @@ void BlueFS::_clear_dirty_set_stable_D(uint64_t seq) auto l = p->second.begin(); while (l != p->second.end()) { File *file = &*l; - ceph_assert(file->dirty_seq <= dirty.seq_stable); + ceph_assert(file->dirty_seq > 0); + ceph_assert(file->dirty_seq <= log_seq_stable); dout(20) << __func__ << " cleaned file " << file->fnode << dendl; - file->dirty_seq = dirty.seq_stable; + file->dirty_seq = 0; p->second.erase(l++); } ceph_assert(p->second.empty()); - dirty.files.erase(p++); + dirty_files.erase(p++); } } else { - dout(20) << __func__ << " seq_stable " << dirty.seq_stable + dout(20) << __func__ << " log_seq_stable " << log_seq_stable << " already >= out seq " << seq << ", we lost a race against another log flush, done" << dendl; } -} -void BlueFS::_release_pending_allocations(vector>& to_release) -{ for (unsigned i = 0; i < to_release.size(); ++i) { if (!to_release[i].empty()) { /* OK, now we have the guarantee alloc[i] won't be null. */ @@ -2719,85 +2609,9 @@ void BlueFS::_release_pending_allocations(vector>& to_rel } } } -} - -int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq) -{ - int64_t available_runway; - do { - log.lock.lock(); - dirty.lock.lock(); - if (want_seq && want_seq <= dirty.seq_stable) { - dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable " - << dirty.seq_stable << ", done" << dendl; - dirty.lock.unlock(); - log.lock.unlock(); - return 0; - } - - available_runway = _maybe_extend_log(); - if (available_runway == -EWOULDBLOCK) { - // we are in need of adding runway, but we are during log-switch from compaction - dirty.lock.unlock(); - //instead log.lock_unlock() do move ownership - std::unique_lock ll(log.lock, std::adopt_lock); - while (log_forbidden_to_expand.load()) { - log_cond.wait(ll); - } - } else { - ceph_assert(available_runway >= 0); - } - } while (available_runway < 0); - - ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet - uint64_t seq =_log_advance_seq(); - _consume_dirty(seq); - vector> to_release(pending_release.size()); - to_release.swap(pending_release); - dirty.lock.unlock(); - - _flush_and_sync_log_core(available_runway); - _flush_bdev(log.writer); - //now log.lock is no longer needed - log.lock.unlock(); - - _clear_dirty_set_stable_D(seq); - _release_pending_allocations(to_release); _update_logger_stats(); - return 0; -} - -// Flushes log and immediately adjusts log_writer pos. -int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to, - int64_t available_runway) -{ - ceph_assert(ceph_mutex_is_locked(log.lock)); - - ceph_assert(jump_to); - // we synchronize writing to log, by lock to log.lock - dirty.lock.lock(); - uint64_t seq =_log_advance_seq(); - _consume_dirty(seq); - vector> to_release(pending_release.size()); - to_release.swap(pending_release); - dirty.lock.unlock(); - _flush_and_sync_log_core(available_runway); - - dout(10) << __func__ << " jumping log offset from 0x" << std::hex - << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl; - log.writer->pos = jump_to; - vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size); - log.writer->file->fnode.size = jump_to; - vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size); - - _flush_bdev(log.writer); - - _clear_dirty_set_stable_D(seq); - _release_pending_allocations(to_release); - - _update_logger_stats(); return 0; } @@ -2807,7 +2621,6 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( const unsigned length, const bluefs_super_t& super) { - ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1); ceph::bufferlist bl; if (partial) { tail_block.splice(0, tail_block.length(), &bl); @@ -2844,43 +2657,35 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( return bl; } -int BlueFS::_signal_dirty_to_log_D(FileWriter *h) +int BlueFS::_signal_dirty_to_log(FileWriter *h) { - ceph_assert(ceph_mutex_is_locked(h->lock)); - std::lock_guard dl(dirty.lock); h->file->fnode.mtime = ceph_clock_now(); ceph_assert(h->file->fnode.ino >= 1); - if (h->file->dirty_seq <= dirty.seq_stable) { - h->file->dirty_seq = dirty.seq_live; - dirty.files[h->file->dirty_seq].push_back(*h->file); - dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + if (h->file->dirty_seq == 0) { + h->file->dirty_seq = log_seq + 1; + dirty_files[h->file->dirty_seq].push_back(*h->file); + dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (was clean)" << dendl; } else { - if (h->file->dirty_seq != dirty.seq_live) { + if (h->file->dirty_seq != log_seq + 1) { // need re-dirty, erase from list first - ceph_assert(dirty.files.count(h->file->dirty_seq)); - auto it = dirty.files[h->file->dirty_seq].iterator_to(*h->file); - dirty.files[h->file->dirty_seq].erase(it); - h->file->dirty_seq = dirty.seq_live; - dirty.files[h->file->dirty_seq].push_back(*h->file); - dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + ceph_assert(dirty_files.count(h->file->dirty_seq)); + auto it = dirty_files[h->file->dirty_seq].iterator_to(*h->file); + dirty_files[h->file->dirty_seq].erase(it); + h->file->dirty_seq = log_seq + 1; + dirty_files[h->file->dirty_seq].push_back(*h->file); + dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (was " << h->file->dirty_seq << ")" << dendl; } else { - dout(20) << __func__ << " dirty_seq = " << dirty.seq_live + dout(20) << __func__ << " dirty_seq = " << log_seq + 1 << " (unchanged, do nothing) " << dendl; } } return 0; } -void BlueFS::flush_range/*WF*/(FileWriter *h, uint64_t offset, uint64_t length) { - std::unique_lock hl(h->lock); - _flush_range_F(h, offset, length); -} - -int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) +int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) { - ceph_assert(ceph_mutex_is_locked(h->lock)); dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos << " 0x" << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; @@ -2891,8 +2696,11 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) ceph_assert(h->file->num_readers.load() == 0); - bool buffered = cct->_conf->bluefs_buffered_io; - ceph_assert(h->file->fnode.ino > 1); + bool buffered; + if (h->file->fnode.ino == 1) + buffered = false; + else + buffered = cct->_conf->bluefs_buffered_io; if (offset + length <= h->pos) return 0; @@ -2903,16 +2711,17 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) << std::hex << offset << "~" << length << std::dec << dendl; } - std::lock_guard file_lock(h->file->lock); ceph_assert(offset <= h->file->fnode.size); uint64_t allocated = h->file->fnode.get_allocated(); vselector->sub_usage(h->file->vselector_hint, h->file->fnode); // do not bother to dirty the file if we are overwriting // previously allocated extents. + if (allocated < offset + length) { // we should never run out of log space here; see the min runway check // in _flush_and_sync_log. + ceph_assert(h->file->fnode.ino != 1); int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint), offset + length - allocated, &h->file->fnode); @@ -2928,19 +2737,15 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length) } if (h->file->fnode.size < offset + length) { h->file->fnode.size = offset + length; - h->file->is_dirty = true; + if (h->file->fnode.ino > 1) { + // we do not need to dirty the log file (or it's compacting + // replacement) when the file size changes because replay is + // smart enough to discover it on its own. + h->file->is_dirty = true; + } } - dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl; - return _flush_data(h, offset, length, buffered); -} -int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered) -{ - if (h->file->fnode.ino != 1) { - ceph_assert(ceph_mutex_is_locked(h->lock)); - ceph_assert(ceph_mutex_is_locked(h->file->lock)); - } uint64_t x_off = 0; auto p = h->file->fnode.seek(offset, &x_off); ceph_assert(p != h->file->fnode.extents.end()); @@ -3030,7 +2835,7 @@ void BlueFS::_claim_completed_aios(FileWriter *h, list *ls) dout(10) << __func__ << " got " << ls->size() << " aios" << dendl; } -void BlueFS::_wait_for_aio(FileWriter *h) +void BlueFS::wait_for_aio(FileWriter *h) { // NOTE: this is safe to call without a lock, as long as our reference is // stable. @@ -3047,55 +2852,18 @@ void BlueFS::_wait_for_aio(FileWriter *h) } #endif -void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len) -{ - bool flushed_sum = false; - { - std::unique_lock hl(h->lock); - size_t max_size = 1ull << 30; // cap to 1GB - while (len > 0) { - bool need_flush = true; - auto l0 = h->get_buffer_length(); - if (l0 < max_size) { - size_t l = std::min(len, max_size - l0); - h->append(buf, l); - buf += l; - len -= l; - need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; - } - if (need_flush) { - bool flushed = false; - int r = _flush_F(h, true, &flushed); - ceph_assert(r == 0); - flushed_sum |= flushed; - // make sure we've made any progress with flush hence the - // loop doesn't iterate forever - ceph_assert(h->get_buffer_length() < max_size); - } - } - } - if (flushed_sum) { - _maybe_compact_log_LN_NF_LD_D(); - } -} - -void BlueFS::flush(FileWriter *h, bool force) +int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock& l) { bool flushed = false; - int r; - { - std::unique_lock hl(h->lock); - r = _flush_F(h, force, &flushed); - ceph_assert(r == 0); - } + int r = _flush(h, force, &flushed); if (r == 0 && flushed) { - _maybe_compact_log_LN_NF_LD_D(); + _maybe_compact_log(l); } + return r; } -int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed) +int BlueFS::_flush(FileWriter *h, bool force, bool *flushed) { - ceph_assert(ceph_mutex_is_locked(h->lock)); uint64_t length = h->get_buffer_length(); uint64_t offset = h->pos; if (flushed) { @@ -3117,34 +2885,15 @@ int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed) << std::hex << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; ceph_assert(h->pos <= h->file->fnode.size); - int r = _flush_range_F(h, offset, length); + int r = _flush_range(h, offset, length); if (flushed) { *flushed = true; } return r; } -// Flush for bluefs special files. -// Does not add extents to h. -// Does not mark h as dirty. -// we do not need to dirty the log file (or it's compacting -// replacement) when the file size changes because replay is -// smart enough to discover it on its own. -int BlueFS::_flush_special(FileWriter *h) +int BlueFS::_truncate(FileWriter *h, uint64_t offset) { - ceph_assert(h->file->fnode.ino <= 1); - uint64_t length = h->get_buffer_length(); - uint64_t offset = h->pos; - ceph_assert(length + offset <= h->file->fnode.get_allocated()); - if (h->file->fnode.size < offset + length) { - h->file->fnode.size = offset + length; - } - return _flush_data(h, offset, length, false); -} - -int BlueFS::truncate(FileWriter *h, uint64_t offset) -{ - std::lock_guard hl(h->lock); dout(10) << __func__ << " 0x" << std::hex << offset << std::dec << " file " << h->file->fnode << dendl; if (h->file->deleted) { @@ -3163,7 +2912,7 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset) ceph_abort_msg("actually this shouldn't happen"); } if (h->get_buffer_length()) { - int r = _flush_F(h, true); + int r = _flush(h, true); if (r < 0) return r; } @@ -3177,63 +2926,58 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset) vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size); h->file->fnode.size = offset; vselector->add_usage(h->file->vselector_hint, h->file->fnode.size); - std::lock_guard ll(log.lock); - log.t.op_file_update(h->file->fnode); + log_t.op_file_update(h->file->fnode); return 0; } -int BlueFS::fsync(FileWriter *h) +int BlueFS::_fsync(FileWriter *h, std::unique_lock& l) { - std::unique_lock hl(h->lock); - uint64_t old_dirty_seq = 0; - { - dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl; - int r = _flush_F(h, true); - if (r < 0) - return r; - _flush_bdev(h); - if (h->file->is_dirty) { - _signal_dirty_to_log_D(h); - h->file->is_dirty = false; - } - { - std::lock_guard dl(dirty.lock); - if (dirty.seq_stable < h->file->dirty_seq) { - old_dirty_seq = h->file->dirty_seq; - dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq - << ") on " << h->file->fnode << ", flushing log" << dendl; - } - } + dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl; + int r = _flush(h, true); + if (r < 0) + return r; + if (h->file->is_dirty) { + _signal_dirty_to_log(h); + h->file->is_dirty = false; } + uint64_t old_dirty_seq = h->file->dirty_seq; + + _flush_bdev_safely(h); + if (old_dirty_seq) { - _flush_and_sync_log_LD(old_dirty_seq); + uint64_t s = log_seq; + dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq + << ") on " << h->file->fnode << ", flushing log" << dendl; + _flush_and_sync_log(l, old_dirty_seq); + ceph_assert(h->file->dirty_seq == 0 || // cleaned + h->file->dirty_seq > s); // or redirtied by someone else } - _maybe_compact_log_LN_NF_LD_D(); return 0; } -// be careful - either h->file->lock or log.lock must be taken -void BlueFS::_flush_bdev(FileWriter *h) +void BlueFS::_flush_bdev_safely(FileWriter *h) { - if (h->file->fnode.ino != 1) { - ceph_assert(ceph_mutex_is_locked(h->lock)); - } else { - ceph_assert(ceph_mutex_is_locked(log.lock)); - } std::array flush_devs = h->dirty_devs; h->dirty_devs.fill(false); #ifdef HAVE_LIBAIO if (!cct->_conf->bluefs_sync_write) { list completed_ios; _claim_completed_aios(h, &completed_ios); - _wait_for_aio(h); + lock.unlock(); + wait_for_aio(h); completed_ios.clear(); - } + flush_bdev(flush_devs); + lock.lock(); + } else #endif - _flush_bdev(flush_devs); + { + lock.unlock(); + flush_bdev(flush_devs); + lock.lock(); + } } -void BlueFS::_flush_bdev(std::array& dirty_bdevs) +void BlueFS::flush_bdev(std::array& dirty_bdevs) { // NOTE: this is safe to call without a lock. dout(20) << __func__ << dendl; @@ -3243,7 +2987,7 @@ void BlueFS::_flush_bdev(std::array& dirty_bdevs) } } -void BlueFS::_flush_bdev() +void BlueFS::flush_bdev() { // NOTE: this is safe to call without a lock. dout(20) << __func__ << dendl; @@ -3364,10 +3108,8 @@ int BlueFS::_allocate(uint8_t id, uint64_t len, return 0; } -int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len) +int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len) { - std::lock_guard ll(log.lock); - std::lock_guard fl(f->lock); dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << off << "~" << len << std::dec << dendl; if (f->deleted) { @@ -3386,44 +3128,39 @@ int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len) vselector->add_usage(f->vselector_hint, f->fnode); if (r < 0) return r; - log.t.op_file_update(f->fnode); + log_t.op_file_update(f->fnode); } return 0; } void BlueFS::sync_metadata(bool avoid_compact) { - bool can_skip_flush; - { - std::lock_guard ll(log.lock); - std::lock_guard dl(dirty.lock); - can_skip_flush = log.t.empty() && dirty.files.empty(); - } - if (can_skip_flush) { + std::unique_lock l(lock); + if (log_t.empty() && dirty_files.empty()) { dout(10) << __func__ << " - no pending log events" << dendl; } else { utime_t start; lgeneric_subdout(cct, bluefs, 10) << __func__; start = ceph_clock_now(); *_dout << dendl; - _flush_bdev(); // FIXME? - _flush_and_sync_log_LD(); + flush_bdev(); // FIXME? + _flush_and_sync_log(l); dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl; } if (!avoid_compact) { - _maybe_compact_log_LN_NF_LD_D(); + _maybe_compact_log(l); } } -void BlueFS::_maybe_compact_log_LN_NF_LD_D() +void BlueFS::_maybe_compact_log(std::unique_lock& l) { if (!cct->_conf->bluefs_replay_recovery_disable_compact && - _should_start_compact_log_L_N()) { + _should_compact_log()) { if (cct->_conf->bluefs_compact_log_sync) { - _compact_log_sync_LN_LD(); + _compact_log_sync(); } else { - _compact_log_async_LD_NF_D(); + _compact_log_async(l); } } } @@ -3434,11 +3171,11 @@ int BlueFS::open_for_write( FileWriter **h, bool overwrite) { - std::unique_lock nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; - map::iterator p = nodes.dir_map.find(dirname); + map::iterator p = dir_map.find(dirname); DirRef dir; - if (p == nodes.dir_map.end()) { + if (p == dir_map.end()) { // implicitly create the dir dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl; @@ -3460,7 +3197,7 @@ int BlueFS::open_for_write( } file = ceph::make_ref(); file->fnode.ino = ++ino_last; - nodes.file_map[ino_last] = file; + file_map[ino_last] = file; dir->file_map[string{filename}] = file; ++file->refs; create = true; @@ -3496,13 +3233,11 @@ int BlueFS::open_for_write( dout(20) << __func__ << " mapping " << dirname << "/" << filename << " vsel_hint " << file->vselector_hint << dendl; - nl.unlock(); - { - std::lock_guard ll(log.lock); - log.t.op_file_update(file->fnode); - if (create) - log.t.op_dir_link(dirname, filename, file->fnode.ino); - } + + log_t.op_file_update(file->fnode); + if (create) + log_t.op_dir_link(dirname, filename, file->fnode.ino); + *h = _create_writer(file); if (boost::algorithm::ends_with(filename, ".log")) { @@ -3532,7 +3267,7 @@ BlueFS::FileWriter *BlueFS::_create_writer(FileRef f) return w; } -void BlueFS::_drain_writer(FileWriter *h) +void BlueFS::_close_writer(FileWriter *h) { dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl; //h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer); @@ -3548,31 +3283,18 @@ void BlueFS::_drain_writer(FileWriter *h) if (h->file->fnode.size >= (1ull << 30)) { dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl; } -} - -void BlueFS::_close_writer(FileWriter *h) -{ - _drain_writer(h); - delete h; -} -void BlueFS::close_writer(FileWriter *h) -{ - { - std::lock_guard l(h->lock); - _drain_writer(h); - } delete h; } uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h) { - std::lock_guard l(h->lock); + std::lock_guard l(lock); return h->file->dirty_seq; } bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev) { - std::lock_guard l(h->lock); + std::lock_guard l(lock); return h->dirty_devs[dev]; } @@ -3582,11 +3304,11 @@ int BlueFS::open_for_read( FileReader **h, bool random) { - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << (random ? " (random)":" (sequential)") << dendl; - map::iterator p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } @@ -3611,12 +3333,11 @@ int BlueFS::rename( std::string_view old_dirname, std::string_view old_filename, std::string_view new_dirname, std::string_view new_filename) { - std::lock_guard ll(log.lock); - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << old_dirname << "/" << old_filename << " -> " << new_dirname << "/" << new_filename << dendl; - map::iterator p = nodes.dir_map.find(old_dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(old_dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl; return -ENOENT; } @@ -3630,8 +3351,8 @@ int BlueFS::rename( } FileRef file = q->second; - p = nodes.dir_map.find(new_dirname); - if (p == nodes.dir_map.end()) { + p = dir_map.find(new_dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl; return -ENOENT; } @@ -3642,8 +3363,8 @@ int BlueFS::rename( << ") file " << new_filename << " already exists, unlinking" << dendl; ceph_assert(q->second != file); - log.t.op_dir_unlink(new_dirname, new_filename); - _drop_link_D(q->second); + log_t.op_dir_unlink(new_dirname, new_filename); + _drop_link(q->second); } dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " " @@ -3652,33 +3373,31 @@ int BlueFS::rename( new_dir->file_map[string{new_filename}] = file; old_dir->file_map.erase(string{old_filename}); - log.t.op_dir_link(new_dirname, new_filename, file->fnode.ino); - log.t.op_dir_unlink(old_dirname, old_filename); + log_t.op_dir_link(new_dirname, new_filename, file->fnode.ino); + log_t.op_dir_unlink(old_dirname, old_filename); return 0; } int BlueFS::mkdir(std::string_view dirname) { - std::lock_guard ll(log.lock); - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; - map::iterator p = nodes.dir_map.find(dirname); - if (p != nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p != dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " exists" << dendl; return -EEXIST; } - nodes.dir_map[string{dirname}] = ceph::make_ref(); - log.t.op_dir_create(dirname); + dir_map[string{dirname}] = ceph::make_ref(); + log_t.op_dir_create(dirname); return 0; } int BlueFS::rmdir(std::string_view dirname) { - std::lock_guard ll(log.lock); - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; - auto p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + auto p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl; return -ENOENT; } @@ -3687,16 +3406,16 @@ int BlueFS::rmdir(std::string_view dirname) dout(20) << __func__ << " dir " << dirname << " not empty" << dendl; return -ENOTEMPTY; } - nodes.dir_map.erase(string{dirname}); - log.t.op_dir_remove(dirname); + dir_map.erase(string{dirname}); + log_t.op_dir_remove(dirname); return 0; } bool BlueFS::dir_exists(std::string_view dirname) { - std::lock_guard nl(nodes.lock); - map::iterator p = nodes.dir_map.find(dirname); - bool exists = p != nodes.dir_map.end(); + std::lock_guard l(lock); + map::iterator p = dir_map.find(dirname); + bool exists = p != dir_map.end(); dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl; return exists; } @@ -3704,10 +3423,10 @@ bool BlueFS::dir_exists(std::string_view dirname) int BlueFS::stat(std::string_view dirname, std::string_view filename, uint64_t *size, utime_t *mtime) { - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; - map::iterator p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } @@ -3732,11 +3451,10 @@ int BlueFS::stat(std::string_view dirname, std::string_view filename, int BlueFS::lock_file(std::string_view dirname, std::string_view filename, FileLock **plock) { - std::lock_guard ll(log.lock); - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; - map::iterator p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } @@ -3750,11 +3468,11 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename, file = ceph::make_ref(); file->fnode.ino = ++ino_last; file->fnode.mtime = ceph_clock_now(); - nodes.file_map[ino_last] = file; + file_map[ino_last] = file; dir->file_map[string{filename}] = file; ++file->refs; - log.t.op_file_update(file->fnode); - log.t.op_dir_link(dirname, filename, file->fnode.ino); + log_t.op_file_update(file->fnode); + log_t.op_dir_link(dirname, filename, file->fnode.ino); } else { file = q->second; if (file->locked) { @@ -3771,7 +3489,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename, int BlueFS::unlock_file(FileLock *fl) { - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl; ceph_assert(fl->file->locked); fl->file->locked = false; @@ -3785,18 +3503,18 @@ int BlueFS::readdir(std::string_view dirname, vector *ls) if (!dirname.empty() && dirname.back() == '/') { dirname.remove_suffix(1); } - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << dendl; if (dirname.empty()) { // list dirs - ls->reserve(nodes.dir_map.size() + 2); - for (auto& q : nodes.dir_map) { + ls->reserve(dir_map.size() + 2); + for (auto& q : dir_map) { ls->push_back(q.first); } } else { // list files in dir - map::iterator p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } @@ -3813,11 +3531,10 @@ int BlueFS::readdir(std::string_view dirname, vector *ls) int BlueFS::unlink(std::string_view dirname, std::string_view filename) { - std::lock_guard ll(log.lock); - std::lock_guard nl(nodes.lock); + std::lock_guard l(lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; - map::iterator p = nodes.dir_map.find(dirname); - if (p == nodes.dir_map.end()) { + map::iterator p = dir_map.find(dirname); + if (p == dir_map.end()) { dout(20) << __func__ << " dir " << dirname << " not found" << dendl; return -ENOENT; } @@ -3835,8 +3552,8 @@ int BlueFS::unlink(std::string_view dirname, std::string_view filename) return -EBUSY; } dir->file_map.erase(string{filename}); - log.t.op_dir_unlink(dirname, filename); - _drop_link_D(file); + log_t.op_dir_unlink(dirname, filename); + _drop_link(file); return 0; } @@ -3859,7 +3576,7 @@ bool BlueFS::wal_is_rotational() When we find it, we decode following bytes as extent. We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction. */ -int BlueFS::_do_replay_recovery_read(FileReader *log_reader, +int BlueFS::do_replay_recovery_read(FileReader *log_reader, size_t replay_pos, size_t read_offset, size_t read_len, @@ -3914,7 +3631,7 @@ int BlueFS::_do_replay_recovery_read(FileReader *log_reader, dout(2) << __func__ << " processing " << get_device_name(dev) << dendl; interval_set disk_regions; disk_regions.insert(0, bdev[dev]->get_size()); - for (auto f : nodes.file_map) { + for (auto f : file_map) { auto& e = f.second->fnode.extents; for (auto& p : e) { if (p.bdev == dev) { diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 085b131d0dcc2..b285592b41df4 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -121,11 +121,6 @@ public: std::atomic_int num_reading; void* vselector_hint = nullptr; - /* lock protects fnode and other the parts that can be modified during read & write operations. - Does not protect values that are fixed - Does not need to be taken when doing one-time operations: - _replay, device_migrate_to_existing, device_migrate_to_new */ - ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock"); private: FRIEND_MAKE_REF(File); @@ -309,6 +304,8 @@ public: }; private: + ceph::mutex lock = ceph::make_mutex("BlueFS::lock"); + PerfCounters *logger = nullptr; uint64_t max_bytes[MAX_BDEV] = {0}; @@ -319,35 +316,26 @@ private: }; // cache - struct { - ceph::mutex lock = ceph::make_mutex("BlueFS::nodes.lock"); - mempool::bluefs::map> dir_map; ///< dirname -> Dir - mempool::bluefs::unordered_map file_map; ///< ino -> File - } nodes; + mempool::bluefs::map> dir_map; ///< dirname -> Dir + mempool::bluefs::unordered_map file_map; ///< ino -> File + + // map of dirty files, files of same dirty_seq are grouped into list. + std::map dirty_files; bluefs_super_t super; ///< latest superblock (as last written) uint64_t ino_last = 0; ///< last assigned ino (this one is in use) + uint64_t log_seq = 0; ///< last used log seq (by current pending log_t) + uint64_t log_seq_stable = 0; ///< last stable/synced log seq + FileWriter *log_writer = 0; ///< writer for the log + bluefs_transaction_t log_t; ///< pending, unwritten log transaction + bool log_flushing = false; ///< true while flushing the log + ceph::condition_variable log_cond; + + uint64_t new_log_jump_to = 0; + uint64_t old_log_jump_to = 0; + FileRef new_log = nullptr; + FileWriter *new_log_writer = nullptr; - struct { - ceph::mutex lock = ceph::make_mutex("BlueFS::log.lock"); - //uint64_t seq_stable = 0; //seq that is now stable on disk AK - consider also mirroring this - uint64_t seq_live = 1; //seq that log is currently writing to; mirrors dirty.seq_live - FileWriter *writer = 0; - bluefs_transaction_t t; - } log; - - struct { - ceph::mutex lock = ceph::make_mutex("BlueFS::dirty.lock"); - uint64_t seq_stable = 0; //seq that is now stable on disk - uint64_t seq_live = 1; //seq that is ongoing and dirty files will be written to - // map of dirty files, files of same dirty_seq are grouped into list. - std::map files; - } dirty; - - ceph::condition_variable log_cond; ///< used for state control between log flush / log compaction - std::atomic log_is_compacting{false}; ///< signals that bluefs log is already ongoing compaction - std::atomic log_forbidden_to_expand{false}; ///< used to signal that async compaction is in state - /// that prohibits expansion of bluefs log /* * There are up to 3 block devices: * @@ -361,11 +349,6 @@ private: std::vector alloc; ///< allocators for bdevs std::vector alloc_size; ///< alloc size for each device std::vector> pending_release; ///< extents to release - // TODO: it should be examined what makes pending_release immune to - // eras in a way similar to dirty_files. Hints: - // 1) we have actually only 2 eras: log_seq and log_seq+1 - // 2) we usually not remove extents from files. And when we do, we force log-syncing. - //std::vector> block_unused_too_granular; BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev @@ -397,7 +380,7 @@ private: FileRef _get_file(uint64_t ino); - void _drop_link_D(FileRef f); + void _drop_link(FileRef f); unsigned _get_slow_device_id() { return bdev[BDEV_SLOW] ? BDEV_SLOW : BDEV_DB; @@ -409,32 +392,22 @@ private: PExtentVector* extents); /* signal replay log to include h->file in nearest log flush */ - int _signal_dirty_to_log_D(FileWriter *h); - int _flush_range_F(FileWriter *h, uint64_t offset, uint64_t length); - int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered); - int _flush_F(FileWriter *h, bool force, bool *flushed = nullptr); - int _flush_special(FileWriter *h); - int _fsync(FileWriter *h); + int _signal_dirty_to_log(FileWriter *h); + int _flush_range(FileWriter *h, uint64_t offset, uint64_t length); + int _flush(FileWriter *h, bool force, std::unique_lock& l); + int _flush(FileWriter *h, bool force, bool *flushed = nullptr); + int _fsync(FileWriter *h, std::unique_lock& l); #ifdef HAVE_LIBAIO void _claim_completed_aios(FileWriter *h, std::list *ls); - void _wait_for_aio(FileWriter *h); // safe to call without a lock + void wait_for_aio(FileWriter *h); // safe to call without a lock #endif - int64_t _maybe_extend_log(); - void _extend_log(); - uint64_t _log_advance_seq(); - void _consume_dirty(uint64_t seq); - void _clear_dirty_set_stable_D(uint64_t seq_stable); - void _release_pending_allocations(std::vector>& to_release); - - void _flush_and_sync_log_core(int64_t available_runway); - int _flush_and_sync_log_jump_D(uint64_t jump_to, - int64_t available_runway); - int _flush_and_sync_log_LD(uint64_t want_seq = 0); - - uint64_t _estimate_log_size_N(); - bool _should_start_compact_log_L_N(); + int _flush_and_sync_log(std::unique_lock& l, + uint64_t want_seq = 0, + uint64_t jump_to = 0); + uint64_t _estimate_log_size(); + bool _should_compact_log(); enum { REMOVE_DB = 1, @@ -442,15 +415,12 @@ private: RENAME_SLOW2DB = 4, RENAME_DB2SLOW = 8, }; - void _compact_log_dump_metadata_N(bluefs_transaction_t *t, - int flags); - void compact_log_async_dump_metadata_NF(bluefs_transaction_t *t, - uint64_t capture_before_seq); + void _compact_log_dump_metadata(bluefs_transaction_t *t, + int flags); + void _compact_log_sync(); + void _compact_log_async(std::unique_lock& l); - void _compact_log_sync_LN_LD(); - void _compact_log_async_LD_NF_D(); - - void _rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback, + void _rewrite_log_and_layout_sync(bool allocate_with_fallback, int super_dev, int log_dev, int new_log_dev, @@ -459,9 +429,9 @@ private: //void _aio_finish(void *priv); - void _flush_bdev(FileWriter *h); - void _flush_bdev(); // this is safe to call without a lock - void _flush_bdev(std::array& dirty_bdevs); // this is safe to call without a lock + void _flush_bdev_safely(FileWriter *h); + void flush_bdev(); // this is safe to call without a lock + void flush_bdev(std::array& dirty_bdevs); // this is safe to call without a lock int _preallocate(FileRef f, uint64_t off, uint64_t len); int _truncate(FileWriter *h, uint64_t off); @@ -478,6 +448,8 @@ private: uint64_t len, ///< [in] this many bytes char *out); ///< [out] optional: or copy it here + void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length); + int _open_super(); int _write_super(int dev); int _check_allocations(const bluefs_fnode_t& fnode, @@ -490,7 +462,6 @@ private: int _replay(bool noop, bool to_stdout = false); ///< replay journal FileWriter *_create_writer(FileRef f); - void _drain_writer(FileWriter *h); void _close_writer(FileWriter *h); // always put the super in the second 4k block. FIXME should this be @@ -556,8 +527,10 @@ public: FileReader **h, bool random = false); - // data added after last fsync() is lost - void close_writer(FileWriter *h); + void close_writer(FileWriter *h) { + std::lock_guard l(lock); + _close_writer(h); + } int rename(std::string_view old_dir, std::string_view old_file, std::string_view new_dir, std::string_view new_file); @@ -581,7 +554,7 @@ public: /// sync any uncommitted state to disk void sync_metadata(bool avoid_compact); /// test and compact log, if necessary - void _maybe_compact_log_LN_NF_LD_D(); + void _maybe_compact_log(std::unique_lock& l); void set_volume_selector(BlueFSVolumeSelector* s) { vselector.reset(s); @@ -603,11 +576,42 @@ public: // handler for discard event void handle_discard(unsigned dev, interval_set& to_release); - void flush(FileWriter *h, bool force = false); + void flush(FileWriter *h, bool force = false) { + std::unique_lock l(lock); + int r = _flush(h, force, l); + ceph_assert(r == 0); + } - void append_try_flush(FileWriter *h, const char* buf, size_t len); - void flush_range(FileWriter *h, uint64_t offset, uint64_t length); - int fsync(FileWriter *h); + void append_try_flush(FileWriter *h, const char* buf, size_t len) { + size_t max_size = 1ull << 30; // cap to 1GB + while (len > 0) { + bool need_flush = true; + auto l0 = h->get_buffer_length(); + if (l0 < max_size) { + size_t l = std::min(len, max_size - l0); + h->append(buf, l); + buf += l; + len -= l; + need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; + } + if (need_flush) { + flush(h, true); + // make sure we've made any progress with flush hence the + // loop doesn't iterate forever + ceph_assert(h->get_buffer_length() < max_size); + } + } + } + void flush_range(FileWriter *h, uint64_t offset, uint64_t length) { + std::lock_guard l(lock); + _flush_range(h, offset, length); + } + int fsync(FileWriter *h) { + std::unique_lock l(lock); + int r = _fsync(h, l); + _maybe_compact_log(l); + return r; + } int64_t read(FileReader *h, uint64_t offset, size_t len, ceph::buffer::list *outbl, char *out) { // no need to hold the global lock here; we only touch h and @@ -622,14 +626,23 @@ public: // atomics and asserts). return _read_random(h, offset, len, out); } - void invalidate_cache(FileRef f, uint64_t offset, uint64_t len); - int preallocate(FileRef f, uint64_t offset, uint64_t len); - int truncate(FileWriter *h, uint64_t offset); - int _do_replay_recovery_read(FileReader *log, - size_t log_pos, - size_t read_offset, - size_t read_len, - bufferlist* bl); + void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) { + std::lock_guard l(lock); + _invalidate_cache(f, offset, len); + } + int preallocate(FileRef f, uint64_t offset, uint64_t len) { + std::lock_guard l(lock); + return _preallocate(f, offset, len); + } + int truncate(FileWriter *h, uint64_t offset) { + std::lock_guard l(lock); + return _truncate(h, offset); + } + int do_replay_recovery_read(FileReader *log, + size_t log_pos, + size_t read_offset, + size_t read_len, + bufferlist* bl); size_t probe_alloc_avail(int dev, uint64_t alloc_size); @@ -695,22 +708,5 @@ public: void get_paths(const std::string& base, paths& res) const override; }; -/** - * Directional graph of locks. - * Vertices - Locks. Edges (directed) - locking progression. - * Edge A->B exist if last taken lock was A and next taken lock is B. - * - * Column represents last lock taken. - * Row represents next lock taken. - * - * < | L | D | N | F | W - * -------------|---|---|---|---|--- - * log L | < < - * dirty D | - * nodes N | < - * File F | - * FileWriter W | < < < - * - * Claim: Deadlock is possible IFF graph contains cycles. - */ + #endif