From 55652f0819761d410bddcf3688b1c3e10ed64f5b Mon Sep 17 00:00:00 2001 From: Patrick Donnelly Date: Tue, 27 Aug 2024 23:05:11 -0400 Subject: [PATCH] osdc: properly acquire locks for getters This was left as a TODO. : / Signed-off-by: Patrick Donnelly --- src/osdc/Journaler.cc | 46 +++++++++++++-------------- src/osdc/Journaler.h | 73 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 81 insertions(+), 38 deletions(-) diff --git a/src/osdc/Journaler.cc b/src/osdc/Journaler.cc index c1c1178bd7f..40cf67702f4 100644 --- a/src/osdc/Journaler.cc +++ b/src/osdc/Journaler.cc @@ -158,7 +158,7 @@ public: void Journaler::recover(Context *onread) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { onread->complete(-EAGAIN); return; } @@ -218,7 +218,7 @@ void Journaler::_reread_head(Context *onfinish) void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { finish->complete(-EAGAIN); return; } @@ -250,7 +250,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) void Journaler::_finish_read_head(int r, bufferlist& bl) { lock_guard l(lock); - if (is_stopping()) + if (state == STATE_STOPPING) return; ceph_assert(state == STATE_READHEAD); @@ -342,7 +342,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { onfinish->complete(-EAGAIN); return; } @@ -359,7 +359,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, void Journaler::_finish_probe_end(int r, uint64_t end) { lock_guard l(lock); - if (is_stopping()) + if (state == STATE_STOPPING) return; ceph_assert(state == STATE_PROBING); @@ -413,7 +413,7 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) { // Expect to be called back from finish_reread_head, which already takes lock // lock is locked - if (is_stopping()) { + if (state == STATE_STOPPING) { onfinish->complete(-EAGAIN); return; } @@ -605,7 +605,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) write_pos += wrote; // flush previous object? - uint64_t su = get_layout_period(); + uint64_t su = layout.get_period(); ceph_assert(su > 0); uint64_t write_off = write_pos % su; uint64_t write_obj = write_pos / su; @@ -630,7 +630,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) void Journaler::_do_flush(unsigned amount) { - if (is_stopping()) + if (state == STATE_STOPPING) return; if (write_pos == flush_pos) return; @@ -645,7 +645,7 @@ void Journaler::_do_flush(unsigned amount) // zero at least two full periods ahead. this ensures // that the next object will not exist. - uint64_t period = get_layout_period(); + uint64_t period = layout.get_period(); if (flush_pos + len + 2*period > prezero_pos) { _issue_prezero(); @@ -718,7 +718,7 @@ void Journaler::_do_flush(unsigned amount) void Journaler::wait_for_flush(Context *onsafe) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { if (onsafe) onsafe->complete(-EAGAIN); return; @@ -752,7 +752,7 @@ void Journaler::_wait_for_flush(Context *onsafe) void Journaler::flush(Context *onsafe) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { if (onsafe) onsafe->complete(-EAGAIN); return; @@ -812,7 +812,7 @@ void Journaler::_issue_prezero() * issue zero requests based on write_pos, even though the invariant * is that we zero ahead of flush_pos. */ - uint64_t period = get_layout_period(); + uint64_t period = layout.get_period(); uint64_t to = write_pos + period * num_periods + period - 1; to -= to % period; @@ -1062,7 +1062,7 @@ void Journaler::_issue_read(uint64_t len) // here because it will wait for all object reads to complete before // giving us back any data. this way we can process whatever bits // come in that are contiguous. - uint64_t period = get_layout_period(); + uint64_t period = layout.get_period(); while (len > 0) { uint64_t e = requested_pos + period; e -= e % period; @@ -1079,7 +1079,7 @@ void Journaler::_issue_read(uint64_t len) void Journaler::_prefetch() { - if (is_stopping()) + if (state == STATE_STOPPING) return; ldout(cct, 10) << "_prefetch" << dendl; @@ -1096,7 +1096,7 @@ void Journaler::_prefetch() uint64_t raw_target = read_pos + pf; // read full log segments, so increase if necessary - uint64_t period = get_layout_period(); + uint64_t period = layout.get_period(); uint64_t remainder = raw_target % period; uint64_t adjustment = remainder ? period - remainder : 0; uint64_t target = raw_target + adjustment; @@ -1215,8 +1215,8 @@ void Journaler::erase(Context *completion) lock_guard l(lock); // Async delete the journal data - uint64_t first = trimmed_pos / get_layout_period(); - uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2; + uint64_t first = trimmed_pos / layout.get_period(); + uint64_t num = (write_pos - trimmed_pos) / layout.get_period() + 2; filer.purge_range(ino, &layout, SnapContext(), first, num, ceph::real_clock::now(), 0, wrap_finisher(new C_EraseFinish( @@ -1231,7 +1231,7 @@ void Journaler::erase(Context *completion) void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) { lock_guard l(lock); - if (is_stopping()) { + if (state == STATE_STOPPING) { completion->complete(-EAGAIN); return; } @@ -1309,7 +1309,7 @@ void Journaler::wait_for_readable(Context *onreadable) void Journaler::_wait_for_readable(Context *onreadable) { - if (is_stopping()) { + if (state == STATE_STOPPING) { finisher->queue(onreadable, -EAGAIN); return; } @@ -1354,11 +1354,11 @@ void Journaler::trim() void Journaler::_trim() { - if (is_stopping()) + if (state == STATE_STOPPING) return; ceph_assert(!readonly); - uint64_t period = get_layout_period(); + uint64_t period = layout.get_period(); uint64_t trim_to = last_committed.expire_pos; trim_to -= trim_to % period; ldout(cct, 10) << "trim last_commited head was " << last_committed @@ -1633,8 +1633,8 @@ void Journaler::check_isreadable() { std::unique_lock l(lock); while (!_is_readable() && - get_read_pos() < get_write_pos() && - !get_error()) { + read_pos < write_pos && + !error) { C_SaferCond readable_waiter; _wait_for_readable(&readable_waiter); l.unlock(); diff --git a/src/osdc/Journaler.h b/src/osdc/Journaler.h index 4c141fea161..4e2db0c97b7 100644 --- a/src/osdc/Journaler.h +++ b/src/osdc/Journaler.h @@ -222,10 +222,10 @@ public: private: // me CephContext *cct; - std::mutex lock; + mutable ceph::mutex lock; const std::string name; - typedef std::lock_guard lock_guard; - typedef std::unique_lock unique_lock; + typedef std::lock_guard lock_guard; + typedef std::unique_lock unique_lock; Finisher *finisher; Header last_written; inodeno_t ino; @@ -408,7 +408,7 @@ public: Journaler(const std::string &name_, inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) : last_committed(mag), - cct(obj->cct), name(name_), finisher(f), last_written(mag), + cct(obj->cct), lock(ceph::make_mutex("Journaler::" + name_)), name(name_), finisher(f), last_written(mag), ino(ino_), pg_pool(pool), readonly(true), stream_format(-1), journal_stream(-1), magic(mag), @@ -528,24 +528,67 @@ public: // Synchronous getters // =================== - // TODO: need some locks on reads for true safety uint64_t get_layout_period() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); return layout.get_period(); } - file_layout_t& get_layout() { return layout; } - bool is_active() { return state == STATE_ACTIVE; } - bool is_stopping() { return state == STATE_STOPPING; } - int get_error() { return error; } - bool is_readonly() { return readonly; } + file_layout_t get_layout() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return layout; + } + bool is_active() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return state == STATE_ACTIVE; + } + bool is_stopping() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return state == STATE_STOPPING; + } + int get_error() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return error; + } + bool is_readonly() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return readonly; + } bool is_readable(); bool _is_readable(); bool try_read_entry(bufferlist& bl); - uint64_t get_write_pos() const { return write_pos; } - uint64_t get_write_safe_pos() const { return safe_pos; } - uint64_t get_read_pos() const { return read_pos; } - uint64_t get_expire_pos() const { return expire_pos; } - uint64_t get_trimmed_pos() const { return trimmed_pos; } + uint64_t get_write_pos() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return write_pos; + } + uint64_t get_write_safe_pos() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return safe_pos; + } + uint64_t get_read_pos() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return read_pos; + } + uint64_t get_expire_pos() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return expire_pos; + } + uint64_t get_trimmed_pos() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); + return trimmed_pos; + } size_t get_journal_envelope_size() const { + ceph_assert(!ceph_mutex_is_locked_by_me(lock)); + lock_guard l(lock); return journal_stream.get_envelope_size(); } void check_isreadable(); -- 2.39.5