]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osdc: properly acquire locks for getters
authorPatrick Donnelly <pdonnell@redhat.com>
Wed, 28 Aug 2024 03:05:11 +0000 (23:05 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 25 Sep 2024 19:42:26 +0000 (15:42 -0400)
This was left as a TODO. : /

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index c1c1178bd7f699b49c8432d796e5827d98031524..40cf67702f4c66ff7263c42d857900e762849af7 100644 (file)
@@ -158,7 +158,7 @@ public:
 void Journaler::recover(Context *onread) 
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     onread->complete(-EAGAIN);
     return;
   }
@@ -218,7 +218,7 @@ void Journaler::_reread_head(Context *onfinish)
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     finish->complete(-EAGAIN);
     return;
   }
@@ -250,7 +250,7 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 void Journaler::_finish_read_head(int r, bufferlist& bl)
 {
   lock_guard l(lock);
-  if (is_stopping())
+  if (state == STATE_STOPPING)
     return;
 
   ceph_assert(state == STATE_READHEAD);
@@ -342,7 +342,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
                                C_OnFinisher *onfinish)
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     onfinish->complete(-EAGAIN);
     return;
   }
@@ -359,7 +359,7 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
 void Journaler::_finish_probe_end(int r, uint64_t end)
 {
   lock_guard l(lock);
-  if (is_stopping())
+  if (state == STATE_STOPPING)
     return;
 
   ceph_assert(state == STATE_PROBING);
@@ -413,7 +413,7 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
 {
   // Expect to be called back from finish_reread_head, which already takes lock
   // lock is locked
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     onfinish->complete(-EAGAIN);
     return;
   }
@@ -605,7 +605,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
   write_pos += wrote;
 
   // flush previous object?
-  uint64_t su = get_layout_period();
+  uint64_t su = layout.get_period();
   ceph_assert(su > 0);
   uint64_t write_off = write_pos % su;
   uint64_t write_obj = write_pos / su;
@@ -630,7 +630,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
 void Journaler::_do_flush(unsigned amount)
 {
-  if (is_stopping())
+  if (state == STATE_STOPPING)
     return;
   if (write_pos == flush_pos)
     return;
@@ -645,7 +645,7 @@ void Journaler::_do_flush(unsigned amount)
 
   // zero at least two full periods ahead.  this ensures
   // that the next object will not exist.
-  uint64_t period = get_layout_period();
+  uint64_t period = layout.get_period();
   if (flush_pos + len + 2*period > prezero_pos) {
     _issue_prezero();
 
@@ -718,7 +718,7 @@ void Journaler::_do_flush(unsigned amount)
 void Journaler::wait_for_flush(Context *onsafe)
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     if (onsafe)
       onsafe->complete(-EAGAIN);
     return;
@@ -752,7 +752,7 @@ void Journaler::_wait_for_flush(Context *onsafe)
 void Journaler::flush(Context *onsafe)
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     if (onsafe)
       onsafe->complete(-EAGAIN);
     return;
@@ -812,7 +812,7 @@ void Journaler::_issue_prezero()
    * issue zero requests based on write_pos, even though the invariant
    * is that we zero ahead of flush_pos.
    */
-  uint64_t period = get_layout_period();
+  uint64_t period = layout.get_period();
   uint64_t to = write_pos + period * num_periods  + period - 1;
   to -= to % period;
 
@@ -1062,7 +1062,7 @@ void Journaler::_issue_read(uint64_t len)
   // here because it will wait for all object reads to complete before
   // giving us back any data.  this way we can process whatever bits
   // come in that are contiguous.
-  uint64_t period = get_layout_period();
+  uint64_t period = layout.get_period();
   while (len > 0) {
     uint64_t e = requested_pos + period;
     e -= e % period;
@@ -1079,7 +1079,7 @@ void Journaler::_issue_read(uint64_t len)
 
 void Journaler::_prefetch()
 {
-  if (is_stopping())
+  if (state == STATE_STOPPING)
     return;
 
   ldout(cct, 10) << "_prefetch" << dendl;
@@ -1096,7 +1096,7 @@ void Journaler::_prefetch()
   uint64_t raw_target = read_pos + pf;
 
   // read full log segments, so increase if necessary
-  uint64_t period = get_layout_period();
+  uint64_t period = layout.get_period();
   uint64_t remainder = raw_target % period;
   uint64_t adjustment = remainder ? period - remainder : 0;
   uint64_t target = raw_target + adjustment;
@@ -1215,8 +1215,8 @@ void Journaler::erase(Context *completion)
   lock_guard l(lock);
 
   // Async delete the journal data
-  uint64_t first = trimmed_pos / get_layout_period();
-  uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
+  uint64_t first = trimmed_pos / layout.get_period();
+  uint64_t num = (write_pos - trimmed_pos) / layout.get_period() + 2;
   filer.purge_range(ino, &layout, SnapContext(), first, num,
                    ceph::real_clock::now(), 0,
                    wrap_finisher(new C_EraseFinish(
@@ -1231,7 +1231,7 @@ void Journaler::erase(Context *completion)
 void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
 {
   lock_guard l(lock);
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     completion->complete(-EAGAIN);
     return;
   }
@@ -1309,7 +1309,7 @@ void Journaler::wait_for_readable(Context *onreadable)
 
 void Journaler::_wait_for_readable(Context *onreadable)
 {
-  if (is_stopping()) {
+  if (state == STATE_STOPPING) {
     finisher->queue(onreadable, -EAGAIN);
     return;
   }
@@ -1354,11 +1354,11 @@ void Journaler::trim()
 
 void Journaler::_trim()
 {
-  if (is_stopping())
+  if (state == STATE_STOPPING)
     return;
 
   ceph_assert(!readonly);
-  uint64_t period = get_layout_period();
+  uint64_t period = layout.get_period();
   uint64_t trim_to = last_committed.expire_pos;
   trim_to -= trim_to % period;
   ldout(cct, 10) << "trim last_commited head was " << last_committed
@@ -1633,8 +1633,8 @@ void Journaler::check_isreadable()
 {
   std::unique_lock l(lock);
   while (!_is_readable() &&
-      get_read_pos() < get_write_pos() &&
-      !get_error()) {
+      read_pos < write_pos &&
+      !error) {
     C_SaferCond readable_waiter;
     _wait_for_readable(&readable_waiter);
     l.unlock();
index 4c141fea161bf44f2266718af59eae87fae75be5..4e2db0c97b7dcab72a7e2244ed75523db65e63cf 100644 (file)
@@ -222,10 +222,10 @@ public:
 private:
   // me
   CephContext *cct;
-  std::mutex lock;
+  mutable ceph::mutex lock;
   const std::string name;
-  typedef std::lock_guard<std::mutex> lock_guard;
-  typedef std::unique_lock<std::mutex> unique_lock;
+  typedef std::lock_guard<ceph::mutex> lock_guard;
+  typedef std::unique_lock<ceph::mutex> unique_lock;
   Finisher *finisher;
   Header last_written;
   inodeno_t ino;
@@ -408,7 +408,7 @@ public:
   Journaler(const std::string &name_, inodeno_t ino_, int64_t pool,
       const char *mag, Objecter *obj, PerfCounters *l, int lkey, Finisher *f) :
     last_committed(mag),
-    cct(obj->cct), name(name_), finisher(f), last_written(mag),
+    cct(obj->cct), lock(ceph::make_mutex("Journaler::" + name_)), name(name_), finisher(f), last_written(mag),
     ino(ino_), pg_pool(pool), readonly(true),
     stream_format(-1), journal_stream(-1),
     magic(mag),
@@ -528,24 +528,67 @@ public:
 
   // Synchronous getters
   // ===================
-  // TODO: need some locks on reads for true safety
   uint64_t get_layout_period() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
     return layout.get_period();
   }
-  file_layout_t& get_layout() { return layout; }
-  bool is_active() { return state == STATE_ACTIVE; }
-  bool is_stopping() { return state == STATE_STOPPING; }
-  int get_error() { return error; }
-  bool is_readonly() { return readonly; }
+  file_layout_t get_layout() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return layout;
+  }
+  bool is_active() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return state == STATE_ACTIVE;
+  }
+  bool is_stopping() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return state == STATE_STOPPING;
+  }
+  int get_error() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return error;
+  }
+  bool is_readonly() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return readonly;
+  }
   bool is_readable();
   bool _is_readable();
   bool try_read_entry(bufferlist& bl);
-  uint64_t get_write_pos() const { return write_pos; }
-  uint64_t get_write_safe_pos() const { return safe_pos; }
-  uint64_t get_read_pos() const { return read_pos; }
-  uint64_t get_expire_pos() const { return expire_pos; }
-  uint64_t get_trimmed_pos() const { return trimmed_pos; }
+  uint64_t get_write_pos() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return write_pos;
+  }
+  uint64_t get_write_safe_pos() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return safe_pos;
+  }
+  uint64_t get_read_pos() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return read_pos;
+  }
+  uint64_t get_expire_pos() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return expire_pos;
+  }
+  uint64_t get_trimmed_pos() const {
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
+    return trimmed_pos;
+  }
   size_t get_journal_envelope_size() const { 
+    ceph_assert(!ceph_mutex_is_locked_by_me(lock));
+    lock_guard l(lock);
     return journal_stream.get_envelope_size(); 
   }
   void check_isreadable();