]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
journal: fix flush by age and in-flight byte tracking
authorJason Dillaman <dillaman@redhat.com>
Tue, 5 Nov 2019 01:12:00 +0000 (20:12 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 19 Nov 2019 13:45:46 +0000 (08:45 -0500)
The flush by age was always causing an immediate flush due to a
backwards comparison. Additionally, the in-flight byte tracker was
never decremented which caused premature closure of the journal
object.

Finally, there was a potential race condition between closing the
object and in-flight notification callbacks executing. Now we keep
the lock held for both closed and overflow callbacks to prevent
the small chance of a race.

Fixes: https://tracker.ceph.com/issues/42598
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/test/journal/test_ObjectRecorder.cc

index 9d34d190b029f1f2da4254ff94591a3d64591613..ca726c114aab0cc2c16a4de2d10497da48ef5bf8 100644 (file)
@@ -27,7 +27,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
     m_op_work_queue(work_queue), m_handler(handler),
     m_order(order), m_soft_max_size(1 << m_order),
     m_max_in_flight_appends(max_in_flight_appends),
-    m_lock(lock), m_last_flush_time(ceph_clock_now())
+    m_lock(lock)
 {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
@@ -97,8 +97,8 @@ void ObjectRecorder::flush(Context *on_safe) {
     // if currently handling flush notifications, wait so that
     // we notify in the correct order (since lock is dropped on
     // callback)
-    if (m_in_flight_flushes) {
-      m_in_flight_flushes_cond.wait(locker);
+    while (m_in_flight_callbacks) {
+      m_in_flight_callbacks_cond.wait(locker);
     }
 
     // attach the flush to the most recent append
@@ -125,27 +125,21 @@ void ObjectRecorder::flush(Context *on_safe) {
 void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) {
   ldout(m_cct, 20) << "flushing " << *future << dendl;
 
-  m_lock->lock();
-  {
-    auto flush_handler = future->get_flush_handler();
-    auto my_handler = get_flush_handler();
-    if (flush_handler != my_handler) {
-      // if we don't own this future, re-issue the flush so that it hits the
-      // correct journal object owner
-      future->flush();
-      m_lock->unlock();
-      return;
-    } else if (future->is_flush_in_progress()) {
-      m_lock->unlock();
-      return;
-    }
+  std::unique_lock locker{*m_lock};
+  auto flush_handler = future->get_flush_handler();
+  auto my_handler = get_flush_handler();
+  if (flush_handler != my_handler) {
+    // if we don't own this future, re-issue the flush so that it hits the
+    // correct journal object owner
+    future->flush();
+    return;
+  } else if (future->is_flush_in_progress()) {
+    return;
   }
 
-  bool overflowed = send_appends(true, future);
-  if (overflowed) {
-    notify_handler_unlock();
-  } else {
-    m_lock->unlock();
+  if (!m_object_closed && !m_overflowed && send_appends(true, future)) {
+    m_in_flight_callbacks = true;
+    notify_handler_unlock(locker, true);
   }
 }
 
@@ -169,52 +163,63 @@ bool ObjectRecorder::close() {
   ceph_assert(ceph_mutex_is_locked(*m_lock));
 
   ldout(m_cct, 20) << dendl;
+
   send_appends(true, {});
 
   ceph_assert(!m_object_closed);
   m_object_closed = true;
-  return (m_in_flight_tids.empty() && !m_in_flight_flushes);
+
+  if (!m_in_flight_tids.empty() || m_in_flight_callbacks) {
+    m_object_closed_notify = true;
+    return false;
+  }
+
+  return true;
 }
 
 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
   ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
 
-  AppendBuffers append_buffers;
-  {
-    m_lock->lock();
-    auto tid_iter = m_in_flight_tids.find(tid);
-    ceph_assert(tid_iter != m_in_flight_tids.end());
-    m_in_flight_tids.erase(tid_iter);
+  std::unique_lock locker{*m_lock};
+  m_in_flight_callbacks = true;
 
-    InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
-    ceph_assert(iter != m_in_flight_appends.end());
+  auto tid_iter = m_in_flight_tids.find(tid);
+  ceph_assert(tid_iter != m_in_flight_tids.end());
+  m_in_flight_tids.erase(tid_iter);
 
-    if (r == -EOVERFLOW) {
-      ldout(m_cct, 10) << "append overflowed" << dendl;
-      m_overflowed = true;
+  InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+  ceph_assert(iter != m_in_flight_appends.end());
 
-      // notify of overflow once all in-flight ops are complete
-      if (m_in_flight_tids.empty()) {
-        append_overflowed();
-        notify_handler_unlock();
-      } else {
-        m_lock->unlock();
-      }
-      return;
+  bool notify_overflowed = false;
+  AppendBuffers append_buffers;
+  if (r == -EOVERFLOW) {
+    ldout(m_cct, 10) << "append overflowed: "
+                     << "idle=" << m_in_flight_tids.empty() << ", "
+                     << "previous_overflow=" << m_overflowed << dendl;
+    if (m_in_flight_tids.empty()) {
+      append_overflowed();
     }
 
+    if (!m_object_closed && !m_overflowed) {
+      notify_overflowed = true;
+    }
+    m_overflowed = true;
+  } else {
     append_buffers.swap(iter->second);
     ceph_assert(!append_buffers.empty());
 
     for (auto& append_buffer : append_buffers) {
-      m_object_bytes += append_buffer.second.length();
+      auto length = append_buffer.second.length();
+      m_object_bytes += length;
+
+      ceph_assert(m_in_flight_bytes >= length);
+      m_in_flight_bytes -= length;
     }
     ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
 
     m_in_flight_appends.erase(iter);
-    m_in_flight_flushes = true;
-    m_lock->unlock();
   }
+  locker.unlock();
 
   // Flag the associated futures as complete.
   for (auto& append_buffer : append_buffers) {
@@ -222,22 +227,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
     append_buffer.first->safe(r);
   }
 
-  // wake up any flush requests that raced with a RADOS callback
-  m_lock->lock();
-  m_in_flight_flushes = false;
-  m_in_flight_flushes_cond.notify_all();
-
-  if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
-    // all remaining unsent appends should be redirected to new object
-    notify_handler_unlock();
-  } else {
-    bool overflowed = send_appends(false, {});
-    if (overflowed) {
-      notify_handler_unlock();
-    } else {
-      m_lock->unlock();
-    }
+  // attempt to kick off more appends to the object
+  locker.lock();
+  if (!m_object_closed && !m_overflowed && send_appends(false, {})) {
+    notify_overflowed = true;
   }
+
+  ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl;
+
+  // all remaining unsent appends should be redirected to new object
+  notify_handler_unlock(locker, notify_overflowed);
 }
 
 void ObjectRecorder::append_overflowed() {
@@ -280,12 +279,17 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_futu
   if (!force &&
       ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
        (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
-       (m_flush_age > 0 &&
-        m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
+       (m_flush_age > 0 && !m_last_flush_time.is_zero() &&
+        m_last_flush_time + m_flush_age <= ceph_clock_now()))) {
     ldout(m_cct, 20) << "forcing batch flush" << dendl;
     force = true;
   }
 
+  // start tracking flush time after the first append event
+  if (m_last_flush_time.is_zero()) {
+    m_last_flush_time = ceph_clock_now();
+  }
+
   auto max_in_flight_appends = m_max_in_flight_appends;
   if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
     if (!force && max_in_flight_appends == 0) {
@@ -315,10 +319,10 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_futu
     auto& bl = it->second;
     auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
     if (size == m_soft_max_size) {
-      ldout(m_cct, 10) << "object at capacity " << *future << dendl;
+      ldout(m_cct, 10) << "object at capacity (" << size << ") " << *future << dendl;
       m_overflowed = true;
     } else if (size > m_soft_max_size) {
-      ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
+      ldout(m_cct, 10) << "object beyond capacity (" << size << ") " << *future << dendl;
       m_overflowed = true;
       break;
     }
@@ -373,15 +377,43 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_futu
   return m_overflowed;
 }
 
-void ObjectRecorder::notify_handler_unlock() {
+void ObjectRecorder::wake_up_flushes() {
   ceph_assert(ceph_mutex_is_locked(*m_lock));
-  if (m_object_closed) {
-    m_lock->unlock();
-    m_handler->closed(this);
-  } else {
+  m_in_flight_callbacks = false;
+  m_in_flight_callbacks_cond.notify_all();
+}
+
+void ObjectRecorder::notify_handler_unlock(
+    std::unique_lock<ceph::mutex>& locker, bool notify_overflowed) {
+  ceph_assert(ceph_mutex_is_locked(*m_lock));
+  ceph_assert(m_in_flight_callbacks);
+
+  if (!m_object_closed && notify_overflowed) {
     // TODO need to delay completion until after aio_notify completes
-    m_lock->unlock();
+    ldout(m_cct, 10) << "overflow" << dendl;
+    ceph_assert(m_overflowed);
+
+    locker.unlock();
     m_handler->overflow(this);
+    locker.lock();
+  }
+
+  // wake up blocked flush requests
+  wake_up_flushes();
+
+  // An overflow notification might have blocked a close. A close
+  // notification could lead to the immediate destruction of this object
+  // so the object shouldn't be referenced anymore
+  bool object_closed_notify = false;
+  if (m_in_flight_tids.empty()) {
+    std::swap(object_closed_notify, m_object_closed_notify);
+  }
+  ceph_assert(m_object_closed || !object_closed_notify);
+  locker.unlock();
+
+  if (object_closed_notify) {
+    ldout(m_cct, 10) << "closed" << dendl;
+    m_handler->closed(this);
   }
 }
 
index 8b4e0a20dc3581f7194aa66775f8a64eafb7e6a5..1b36d246612459f18076461e1025834684695204 100644 (file)
@@ -135,20 +135,25 @@ private:
   InFlightTids m_in_flight_tids;
   InFlightAppends m_in_flight_appends;
   uint64_t m_object_bytes = 0;
+
   bool m_overflowed = false;
+
   bool m_object_closed = false;
+  bool m_object_closed_notify = false;
 
   bufferlist m_prefetch_bl;
 
-  bool m_in_flight_flushes = false;
-  ceph::condition_variable m_in_flight_flushes_cond;
+  bool m_in_flight_callbacks = false;
+  ceph::condition_variable m_in_flight_callbacks_cond;
   uint64_t m_in_flight_bytes = 0;
 
   bool send_appends(bool force, ceph::ref_t<FutureImpl> flush_sentinal);
   void handle_append_flushed(uint64_t tid, int r);
   void append_overflowed();
 
-  void notify_handler_unlock();
+  void wake_up_flushes();
+  void notify_handler_unlock(std::unique_lock<ceph::mutex>& locker,
+                             bool notify_overflowed);
 };
 
 } // namespace journal
index 72e4fd9c19442ee8d95745890a28d780b536aaf1..ac110a23e6fbe50b737a76ddf8d77a5d7080ad5a 100644 (file)
@@ -118,7 +118,8 @@ public:
     Handler m_handler;
   };
 
-  journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
+  journal::AppendBuffer create_append_buffer(uint64_t tag_tid,
+                                             uint64_t entry_tid,
                                              const std::string &payload) {
     auto future = ceph::make_ref<journal::FutureImpl>(tag_tid, entry_tid, 456);
     future->init(ceph::ref_t<journal::FutureImpl>());
@@ -237,7 +238,7 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
-  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1);
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1);
   auto object = flusher.create_object(oid, 24, &lock);
 
   journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
@@ -248,12 +249,20 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) {
   ASSERT_FALSE(object->append(std::move(append_buffers)));
   lock.unlock();
 
-  journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
-                                                              "payload");
-  append_buffers = {append_buffer2};
-  lock.lock();
-  ASSERT_FALSE(object->append(std::move(append_buffers)));
-  lock.unlock();
+  uint32_t offset  = 0;
+  journal::AppendBuffer append_buffer2;
+  while (!append_buffer1.first->is_flush_in_progress() &&
+         !append_buffer1.first->is_complete()) {
+    usleep(1000);
+
+    append_buffer2 = create_append_buffer(234, 124 + offset, "payload");
+    ++offset;
+    append_buffers = {append_buffer2};
+
+    lock.lock();
+    ASSERT_FALSE(object->append(std::move(append_buffers)));
+    lock.unlock();
+  }
 
   C_SaferCond cond;
   append_buffer2.first->wait(&cond);
@@ -269,7 +278,7 @@ TEST_F(TestObjectRecorder, AppendFilledObject) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   ceph::mutex lock = ceph::make_mutex("object_recorder_lock");
-  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue);
+  ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1);
   auto object = flusher.create_object(oid, 12, &lock);
 
   std::string payload(2048, '1');
@@ -440,8 +449,6 @@ TEST_F(TestObjectRecorder, Overflow) {
   ASSERT_EQ(0, cond.wait());
   ASSERT_EQ(0U, object1->get_pending_appends());
 
-  ASSERT_TRUE(flusher.wait_for_overflow());
-
   auto object2 = flusher.create_object(oid, 12, &lock2);
 
   journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,