From e536b5c8e10e152ef07fc5ee5bb019cea359d249 Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Mon, 4 Nov 2019 20:12:00 -0500 Subject: [PATCH] journal: fix flush by age and in-flight byte tracking The flush by age was always causing an immediate flush due to a backwards comparison. Additionally, the in-flight byte tracker was never decremented which caused premature closure of the journal object. Finally, there was a potential race condition between closing the object and in-flight notification callbacks executing. Now we keep the lock held for both closed and overflow callbacks to prevent the small chance of a race. Fixes: https://tracker.ceph.com/issues/42598 Signed-off-by: Jason Dillaman --- src/journal/ObjectRecorder.cc | 172 ++++++++++++++---------- src/journal/ObjectRecorder.h | 11 +- src/test/journal/test_ObjectRecorder.cc | 29 ++-- 3 files changed, 128 insertions(+), 84 deletions(-) diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 9d34d190b029f..ca726c114aab0 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -27,7 +27,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid, m_op_work_queue(work_queue), m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), m_max_in_flight_appends(max_in_flight_appends), - m_lock(lock), m_last_flush_time(ceph_clock_now()) + m_lock(lock) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); @@ -97,8 +97,8 @@ void ObjectRecorder::flush(Context *on_safe) { // if currently handling flush notifications, wait so that // we notify in the correct order (since lock is dropped on // callback) - if (m_in_flight_flushes) { - m_in_flight_flushes_cond.wait(locker); + while (m_in_flight_callbacks) { + m_in_flight_callbacks_cond.wait(locker); } // attach the flush to the most recent append @@ -125,27 +125,21 @@ void ObjectRecorder::flush(Context *on_safe) { void ObjectRecorder::flush(const ceph::ref_t& future) { ldout(m_cct, 20) << "flushing " << *future << dendl; - m_lock->lock(); - { - auto flush_handler = future->get_flush_handler(); - auto my_handler = get_flush_handler(); - if (flush_handler != my_handler) { - // if we don't own this future, re-issue the flush so that it hits the - // correct journal object owner - future->flush(); - m_lock->unlock(); - return; - } else if (future->is_flush_in_progress()) { - m_lock->unlock(); - return; - } + std::unique_lock locker{*m_lock}; + auto flush_handler = future->get_flush_handler(); + auto my_handler = get_flush_handler(); + if (flush_handler != my_handler) { + // if we don't own this future, re-issue the flush so that it hits the + // correct journal object owner + future->flush(); + return; + } else if (future->is_flush_in_progress()) { + return; } - bool overflowed = send_appends(true, future); - if (overflowed) { - notify_handler_unlock(); - } else { - m_lock->unlock(); + if (!m_object_closed && !m_overflowed && send_appends(true, future)) { + m_in_flight_callbacks = true; + notify_handler_unlock(locker, true); } } @@ -169,52 +163,63 @@ bool ObjectRecorder::close() { ceph_assert(ceph_mutex_is_locked(*m_lock)); ldout(m_cct, 20) << dendl; + send_appends(true, {}); ceph_assert(!m_object_closed); m_object_closed = true; - return (m_in_flight_tids.empty() && !m_in_flight_flushes); + + if (!m_in_flight_tids.empty() || m_in_flight_callbacks) { + m_object_closed_notify = true; + return false; + } + + return true; } void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl; - AppendBuffers append_buffers; - { - m_lock->lock(); - auto tid_iter = m_in_flight_tids.find(tid); - ceph_assert(tid_iter != m_in_flight_tids.end()); - m_in_flight_tids.erase(tid_iter); + std::unique_lock locker{*m_lock}; + m_in_flight_callbacks = true; - InFlightAppends::iterator iter = m_in_flight_appends.find(tid); - ceph_assert(iter != m_in_flight_appends.end()); + auto tid_iter = m_in_flight_tids.find(tid); + ceph_assert(tid_iter != m_in_flight_tids.end()); + m_in_flight_tids.erase(tid_iter); - if (r == -EOVERFLOW) { - ldout(m_cct, 10) << "append overflowed" << dendl; - m_overflowed = true; + InFlightAppends::iterator iter = m_in_flight_appends.find(tid); + ceph_assert(iter != m_in_flight_appends.end()); - // notify of overflow once all in-flight ops are complete - if (m_in_flight_tids.empty()) { - append_overflowed(); - notify_handler_unlock(); - } else { - m_lock->unlock(); - } - return; + bool notify_overflowed = false; + AppendBuffers append_buffers; + if (r == -EOVERFLOW) { + ldout(m_cct, 10) << "append overflowed: " + << "idle=" << m_in_flight_tids.empty() << ", " + << "previous_overflow=" << m_overflowed << dendl; + if (m_in_flight_tids.empty()) { + append_overflowed(); } + if (!m_object_closed && !m_overflowed) { + notify_overflowed = true; + } + m_overflowed = true; + } else { append_buffers.swap(iter->second); ceph_assert(!append_buffers.empty()); for (auto& append_buffer : append_buffers) { - m_object_bytes += append_buffer.second.length(); + auto length = append_buffer.second.length(); + m_object_bytes += length; + + ceph_assert(m_in_flight_bytes >= length); + m_in_flight_bytes -= length; } ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl; m_in_flight_appends.erase(iter); - m_in_flight_flushes = true; - m_lock->unlock(); } + locker.unlock(); // Flag the associated futures as complete. for (auto& append_buffer : append_buffers) { @@ -222,22 +227,16 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { append_buffer.first->safe(r); } - // wake up any flush requests that raced with a RADOS callback - m_lock->lock(); - m_in_flight_flushes = false; - m_in_flight_flushes_cond.notify_all(); - - if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) { - // all remaining unsent appends should be redirected to new object - notify_handler_unlock(); - } else { - bool overflowed = send_appends(false, {}); - if (overflowed) { - notify_handler_unlock(); - } else { - m_lock->unlock(); - } + // attempt to kick off more appends to the object + locker.lock(); + if (!m_object_closed && !m_overflowed && send_appends(false, {})) { + notify_overflowed = true; } + + ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl; + + // all remaining unsent appends should be redirected to new object + notify_handler_unlock(locker, notify_overflowed); } void ObjectRecorder::append_overflowed() { @@ -280,12 +279,17 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t flush_futu if (!force && ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) || (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) || - (m_flush_age > 0 && - m_last_flush_time + m_flush_age >= ceph_clock_now()))) { + (m_flush_age > 0 && !m_last_flush_time.is_zero() && + m_last_flush_time + m_flush_age <= ceph_clock_now()))) { ldout(m_cct, 20) << "forcing batch flush" << dendl; force = true; } + // start tracking flush time after the first append event + if (m_last_flush_time.is_zero()) { + m_last_flush_time = ceph_clock_now(); + } + auto max_in_flight_appends = m_max_in_flight_appends; if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) { if (!force && max_in_flight_appends == 0) { @@ -315,10 +319,10 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t flush_futu auto& bl = it->second; auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length(); if (size == m_soft_max_size) { - ldout(m_cct, 10) << "object at capacity " << *future << dendl; + ldout(m_cct, 10) << "object at capacity (" << size << ") " << *future << dendl; m_overflowed = true; } else if (size > m_soft_max_size) { - ldout(m_cct, 10) << "object beyond capacity " << *future << dendl; + ldout(m_cct, 10) << "object beyond capacity (" << size << ") " << *future << dendl; m_overflowed = true; break; } @@ -373,15 +377,43 @@ bool ObjectRecorder::send_appends(bool force, ceph::ref_t flush_futu return m_overflowed; } -void ObjectRecorder::notify_handler_unlock() { +void ObjectRecorder::wake_up_flushes() { ceph_assert(ceph_mutex_is_locked(*m_lock)); - if (m_object_closed) { - m_lock->unlock(); - m_handler->closed(this); - } else { + m_in_flight_callbacks = false; + m_in_flight_callbacks_cond.notify_all(); +} + +void ObjectRecorder::notify_handler_unlock( + std::unique_lock& locker, bool notify_overflowed) { + ceph_assert(ceph_mutex_is_locked(*m_lock)); + ceph_assert(m_in_flight_callbacks); + + if (!m_object_closed && notify_overflowed) { // TODO need to delay completion until after aio_notify completes - m_lock->unlock(); + ldout(m_cct, 10) << "overflow" << dendl; + ceph_assert(m_overflowed); + + locker.unlock(); m_handler->overflow(this); + locker.lock(); + } + + // wake up blocked flush requests + wake_up_flushes(); + + // An overflow notification might have blocked a close. A close + // notification could lead to the immediate destruction of this object + // so the object shouldn't be referenced anymore + bool object_closed_notify = false; + if (m_in_flight_tids.empty()) { + std::swap(object_closed_notify, m_object_closed_notify); + } + ceph_assert(m_object_closed || !object_closed_notify); + locker.unlock(); + + if (object_closed_notify) { + ldout(m_cct, 10) << "closed" << dendl; + m_handler->closed(this); } } diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 8b4e0a20dc358..1b36d24661245 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -135,20 +135,25 @@ private: InFlightTids m_in_flight_tids; InFlightAppends m_in_flight_appends; uint64_t m_object_bytes = 0; + bool m_overflowed = false; + bool m_object_closed = false; + bool m_object_closed_notify = false; bufferlist m_prefetch_bl; - bool m_in_flight_flushes = false; - ceph::condition_variable m_in_flight_flushes_cond; + bool m_in_flight_callbacks = false; + ceph::condition_variable m_in_flight_callbacks_cond; uint64_t m_in_flight_bytes = 0; bool send_appends(bool force, ceph::ref_t flush_sentinal); void handle_append_flushed(uint64_t tid, int r); void append_overflowed(); - void notify_handler_unlock(); + void wake_up_flushes(); + void notify_handler_unlock(std::unique_lock& locker, + bool notify_overflowed); }; } // namespace journal diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index 72e4fd9c19442..ac110a23e6fbe 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -118,7 +118,8 @@ public: Handler m_handler; }; - journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid, + journal::AppendBuffer create_append_buffer(uint64_t tag_tid, + uint64_t entry_tid, const std::string &payload) { auto future = ceph::make_ref(tag_tid, entry_tid, 456); future->init(ceph::ref_t()); @@ -237,7 +238,7 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); - ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0005, -1); auto object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, @@ -248,12 +249,20 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { ASSERT_FALSE(object->append(std::move(append_buffers))); lock.unlock(); - journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, - "payload"); - append_buffers = {append_buffer2}; - lock.lock(); - ASSERT_FALSE(object->append(std::move(append_buffers))); - lock.unlock(); + uint32_t offset = 0; + journal::AppendBuffer append_buffer2; + while (!append_buffer1.first->is_flush_in_progress() && + !append_buffer1.first->is_complete()) { + usleep(1000); + + append_buffer2 = create_append_buffer(234, 124 + offset, "payload"); + ++offset; + append_buffers = {append_buffer2}; + + lock.lock(); + ASSERT_FALSE(object->append(std::move(append_buffers))); + lock.unlock(); + } C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -269,7 +278,7 @@ TEST_F(TestObjectRecorder, AppendFilledObject) { ASSERT_EQ(0, init_metadata(metadata)); ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); - ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.0, -1); auto object = flusher.create_object(oid, 12, &lock); std::string payload(2048, '1'); @@ -440,8 +449,6 @@ TEST_F(TestObjectRecorder, Overflow) { ASSERT_EQ(0, cond.wait()); ASSERT_EQ(0U, object1->get_pending_appends()); - ASSERT_TRUE(flusher.wait_for_overflow()); - auto object2 = flusher.create_object(oid, 12, &lock2); journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123, -- 2.39.5