From: Ricardo Dias Date: Wed, 7 Sep 2016 14:26:34 +0000 (+0100) Subject: journal: make librados call async in ObjectRecorder X-Git-Tag: v11.0.1~86^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7b740f5b4ac1c66ac3c80782d2d34e846d00fddd;p=ceph-ci.git journal: make librados call async in ObjectRecorder Signed-off-by: Ricardo Dias --- diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 880130126dd..d28710b3c15 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -98,6 +98,10 @@ public: m_work_queue->queue(on_finish, r); } + inline ContextWQ *get_work_queue() { + return m_work_queue; + } + inline SafeTimer &get_timer() { return *m_timer; } diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 0113bd4a7b9..1b0e5704f71 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -61,10 +61,12 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { - m_object_locks.push_back(shared_ptr(new Mutex("ObjectRecorder::m_lock::"+ - std::to_string(splay_offset)))); + m_object_locks.push_back(shared_ptr( + new Mutex("ObjectRecorder::m_lock::"+ + std::to_string(splay_offset)))); uint64_t object_number = splay_offset + (m_current_set * splay_width); - m_object_ptrs[splay_offset] = create_object_recorder(object_number, + m_object_ptrs[splay_offset] = create_object_recorder( + object_number, m_object_locks[splay_offset]); } @@ -206,14 +208,17 @@ void JournalRecorder::open_object_set() { for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; - if (object_recorder->get_object_number() / splay_width != m_current_set) { + uint64_t object_number = object_recorder->get_object_number(); + if (object_number / splay_width != m_current_set) { assert(object_recorder->is_closed()); // ready to close object and open object in active set - create_next_object_recorder(object_recorder); + create_next_object_recorder_unlock(object_recorder); + } else { + uint8_t splay_offset = object_number % splay_width; + m_object_locks[splay_offset]->Unlock(); } } - unlock_object_recorders(); } bool JournalRecorder::close_object_set(uint64_t active_set) { @@ -246,14 +251,14 @@ ObjectRecorderPtr JournalRecorder::create_object_recorder( uint64_t object_number, shared_ptr lock) { ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), - object_number, lock, m_journal_metadata->get_timer(), - m_journal_metadata->get_timer_lock(), &m_object_handler, - m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes, - m_flush_age)); + object_number, lock, m_journal_metadata->get_work_queue(), + m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), + &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, + m_flush_bytes, m_flush_age)); return object_recorder; } -void JournalRecorder::create_next_object_recorder( +void JournalRecorder::create_next_object_recorder_unlock( ObjectRecorderPtr object_recorder) { assert(m_lock.is_locked()); @@ -279,7 +284,7 @@ void JournalRecorder::create_next_object_recorder( new_object_recorder->get_object_number()); } - new_object_recorder->append(append_buffers, false); + new_object_recorder->append_unlock(append_buffers); m_object_ptrs[splay_offset] = new_object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index acef0e4b179..7a4af52ee83 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -106,7 +106,7 @@ private: ObjectRecorderPtr create_object_recorder(uint64_t object_number, std::shared_ptr lock); - void create_next_object_recorder(ObjectRecorderPtr object_recorder); + void create_next_object_recorder_unlock(ObjectRecorderPtr object_recorder); void handle_update(); diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index c12ecff02c6..cbd3842da39 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -19,17 +19,18 @@ namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, shared_ptr lock, - SafeTimer &timer, Mutex &timer_lock, - Handler *handler, uint8_t order, - uint32_t flush_interval, uint64_t flush_bytes, - double flush_age) + ContextWQ *work_queue, SafeTimer &timer, + Mutex &timer_lock, Handler *handler, + uint8_t order, uint32_t flush_interval, + uint64_t flush_bytes, double flush_age) : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), - m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), - m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), - m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), - m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL), - m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0), - m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { + m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer), + m_timer_lock(timer_lock), m_handler(handler), m_order(order), + m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), + m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), + m_append_task(NULL), m_lock(lock), m_append_tid(0), m_pending_bytes(0), + m_size(0), m_overflowed(false), m_object_closed(false), + m_in_flight_flushes(false), m_aio_scheduled(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); assert(m_handler != NULL); @@ -40,51 +41,39 @@ ObjectRecorder::~ObjectRecorder() { assert(m_append_buffers.empty()); assert(m_in_flight_tids.empty()); assert(m_in_flight_appends.empty()); + assert(!m_aio_scheduled); } bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) { - return append(append_buffers, true); -} - -bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) { assert(m_lock->is_locked()); FutureImplPtr last_flushed_future; bool schedule_append = false; - { - if (m_overflowed) { - m_append_buffers.insert(m_append_buffers.end(), - append_buffers.begin(), append_buffers.end()); - if (unlock) { - m_lock->Unlock(); - } - return false; - } - - for (AppendBuffers::const_iterator iter = append_buffers.begin(); - iter != append_buffers.end(); ++iter) { - if (append(*iter, &schedule_append)) { - last_flushed_future = iter->first; - } - } + + if (m_overflowed) { + m_append_buffers.insert(m_append_buffers.end(), + append_buffers.begin(), append_buffers.end()); + m_lock->Unlock(); + return false; + } - if (unlock) { - m_lock->Unlock(); + for (AppendBuffers::const_iterator iter = append_buffers.begin(); + iter != append_buffers.end(); ++iter) { + if (append(*iter, &schedule_append)) { + last_flushed_future = iter->first; } } if (last_flushed_future) { - if (unlock) { - m_lock->Lock(); - } flush(last_flushed_future); - if (unlock) { - m_lock->Unlock(); - } - } else if (schedule_append) { - schedule_append_task(); + m_lock->Unlock(); } else { - cancel_append_task(); + m_lock->Unlock(); + if (schedule_append) { + schedule_append_task(); + } else { + cancel_append_task(); + } } return (!m_object_closed && !m_overflowed && m_size + m_pending_bytes >= m_soft_max_size); @@ -180,7 +169,7 @@ bool ObjectRecorder::close() { assert(!m_object_closed); m_object_closed = true; - return m_in_flight_tids.empty(); + return m_in_flight_tids.empty() && !m_aio_scheduled; } void ObjectRecorder::handle_append_task() { @@ -268,7 +257,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { } // notify of overflow once all in-flight ops are complete - if (m_in_flight_tids.empty()) { + if (m_in_flight_tids.empty() && !m_aio_scheduled) { notify_handler(); } return; @@ -279,7 +268,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { assert(!append_buffers.empty()); m_in_flight_appends.erase(iter); - if (m_in_flight_appends.empty() && m_object_closed) { + if (m_in_flight_appends.empty() && !m_aio_scheduled && m_object_closed) { // all remaining unsent appends should be redirected to new object notify_handler(); } @@ -331,6 +320,32 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { assert(m_lock->is_locked()); assert(!append_buffers->empty()); + for (AppendBuffers::iterator it = append_buffers->begin(); + it != append_buffers->end(); ++it) { + ldout(m_cct, 20) << __func__ << ": flushing " << *it->first + << dendl; + it->first->set_flush_in_progress(); + m_size += it->second.length(); + } + + m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers, + append_buffers->begin(), append_buffers->end()); + if (!m_aio_scheduled) { + m_op_work_queue->queue(new FunctionContext([this] (int r) { + send_appends_aio(); + })); + m_aio_scheduled = true; + } +} + +void ObjectRecorder::send_appends_aio() { + Mutex::Locker locker(*m_lock); + + m_aio_scheduled = false; + + AppendBuffers append_buffers; + m_pending_buffers.swap(append_buffers); + uint64_t append_tid = m_append_tid++; ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" << append_tid << dendl; @@ -339,17 +354,15 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { librados::ObjectWriteOperation op; client::guard_append(&op, m_soft_max_size); - for (AppendBuffers::iterator it = append_buffers->begin(); - it != append_buffers->end(); ++it) { + for (AppendBuffers::iterator it = append_buffers.begin(); + it != append_buffers.end(); ++it) { ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl; - it->first->set_flush_in_progress(); op.append(it->second); op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); - m_size += it->second.length(); } m_in_flight_tids.insert(append_tid); - m_in_flight_appends[append_tid].swap(*append_buffers); + m_in_flight_appends[append_tid].swap(append_buffers); librados::AioCompletion *rados_completion = librados::Rados::aio_create_completion(append_flush, NULL, diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 7d93ca26c6f..f9cf1065786 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -9,6 +9,7 @@ #include "common/Cond.h" #include "common/Mutex.h" #include "common/RefCountedObj.h" +#include "common/WorkQueue.h" #include "journal/FutureImpl.h" #include #include @@ -38,9 +39,9 @@ public: ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, uint64_t object_number, std::shared_ptr lock, - SafeTimer &timer, Mutex &timer_lock, Handler *handler, - uint8_t order, uint32_t flush_interval, uint64_t flush_bytes, - double flush_age); + ContextWQ *work_queue, SafeTimer &timer, Mutex &timer_lock, + Handler *handler, uint8_t order, uint32_t flush_interval, + uint64_t flush_bytes, double flush_age); ~ObjectRecorder(); inline uint64_t get_object_number() const { @@ -51,7 +52,6 @@ public: } bool append_unlock(const AppendBuffers &append_buffers); - bool append(const AppendBuffers &append_buffers, bool unlock); void flush(Context *on_safe); void flush(const FutureImplPtr &future); @@ -86,6 +86,7 @@ private: object_recorder->put(); } virtual void flush(const FutureImplPtr &future) { + Mutex::Locker locker(*(object_recorder->m_lock)); object_recorder->flush(future); } }; @@ -115,6 +116,8 @@ private: uint64_t m_object_number; CephContext *m_cct; + ContextWQ *m_op_work_queue; + SafeTimer &m_timer; Mutex &m_timer_lock; @@ -147,6 +150,9 @@ private: bool m_in_flight_flushes; Cond m_in_flight_flushes_cond; + AppendBuffers m_pending_buffers; + bool m_aio_scheduled; + void handle_append_task(); void cancel_append_task(); void schedule_append_task(); @@ -156,6 +162,7 @@ private: void handle_append_flushed(uint64_t tid, int r); void append_overflowed(uint64_t tid); void send_appends(AppendBuffers *append_buffers); + void send_appends_aio(); void notify_handler(); }; diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 2109e1874a2..a23d709d624 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -20,6 +20,7 @@ #include "librbd/journal/CreateRequest.h" #include +#include #define dout_subsys ceph_subsys_rbd #undef dout_prefix @@ -670,50 +671,56 @@ int Journal::demote() { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << __func__ << dendl; - Mutex::Locker locker(m_lock); - assert(m_journaler != nullptr && is_tag_owner(m_lock)); + int r; + C_SaferCond ctx; + Future future; + C_SaferCond flush_ctx; - cls::journal::Client client; - int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": " - << "failed to retrieve client: " << cpp_strerror(r) << dendl; - return r; - } + { + Mutex::Locker locker(m_lock); + assert(m_journaler != nullptr && is_tag_owner(m_lock)); - assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); - journal::TagPredecessor predecessor; - predecessor.mirror_uuid = LOCAL_MIRROR_UUID; - if (!client.commit_position.object_positions.empty()) { - auto position = client.commit_position.object_positions.front(); - predecessor.commit_valid = true; - predecessor.tag_tid = position.tag_tid; - predecessor.entry_tid = position.entry_tid; - } + cls::journal::Client client; + r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client); + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to retrieve client: " << cpp_strerror(r) << dendl; + return r; + } - cls::journal::Tag new_tag; - r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class, - predecessor, ORPHAN_MIRROR_UUID, &new_tag); - if (r < 0) { - return r; - } + assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID); + journal::TagPredecessor predecessor; + predecessor.mirror_uuid = LOCAL_MIRROR_UUID; + if (!client.commit_position.object_positions.empty()) { + auto position = client.commit_position.object_positions.front(); + predecessor.commit_valid = true; + predecessor.tag_tid = position.tag_tid; + predecessor.entry_tid = position.entry_tid; + } - bufferlist::iterator tag_data_bl_it = new_tag.data.begin(); - r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data); - if (r < 0) { - lderr(cct) << this << " " << __func__ << ": " - << "failed to decode newly allocated tag" << dendl; - return r; - } + cls::journal::Tag new_tag; + r = allocate_journaler_tag(cct, m_journaler, client, m_tag_class, + predecessor, ORPHAN_MIRROR_UUID, &new_tag); + if (r < 0) { + return r; + } + + bufferlist::iterator tag_data_bl_it = new_tag.data.begin(); + r = C_DecodeTag::decode(&tag_data_bl_it, &m_tag_data); + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": " + << "failed to decode newly allocated tag" << dendl; + return r; + } - journal::EventEntry event_entry{journal::DemoteEvent{}}; - bufferlist event_entry_bl; - ::encode(event_entry, event_entry_bl); + journal::EventEntry event_entry{journal::DemoteEvent{}}; + bufferlist event_entry_bl; + ::encode(event_entry, event_entry_bl); - m_tag_tid = new_tag.tid; - Future future = m_journaler->append(m_tag_tid, event_entry_bl); - C_SaferCond ctx; - future.flush(&ctx); + m_tag_tid = new_tag.tid; + future = m_journaler->append(m_tag_tid, event_entry_bl); + future.flush(&ctx); + } r = ctx.wait(); if (r < 0) { @@ -723,9 +730,11 @@ int Journal::demote() { return r; } - m_journaler->committed(future); - C_SaferCond flush_ctx; - m_journaler->flush_commit_position(&flush_ctx); + { + Mutex::Locker l(m_lock); + m_journaler->committed(future); + m_journaler->flush_commit_position(&flush_ctx); + } r = flush_ctx.wait(); if (r < 0) { @@ -858,21 +867,22 @@ uint64_t Journal::append_io_events(journal::EventType event_type, bool flush_entry) { assert(!bufferlists.empty()); - Futures futures; uint64_t tid; { - { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_READY); + Mutex::Locker locker(m_lock); + assert(m_state == STATE_READY); - tid = ++m_event_tid; - assert(tid != 0); - } + tid = ++m_event_tid; + assert(tid != 0); + } - for (auto &bl : bufferlists) { - assert(bl.length() <= m_max_append_size); - futures.push_back(m_journaler->append(m_tag_tid, bl)); - } + Futures futures; + for (auto &bl : bufferlists) { + assert(bl.length() <= m_max_append_size); + futures.push_back(m_journaler->append(m_tag_tid, bl)); + } + + { Mutex::Locker event_locker(m_event_lock); m_events[tid] = Event(futures, requests, offset, length); } @@ -892,6 +902,7 @@ uint64_t Journal::append_io_events(journal::EventType event_type, } else { futures.back().wait(on_safe); } + return tid; } diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 374c0bedce1..49a902db917 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -10,6 +10,7 @@ #include "include/interval_set.h" #include "common/Cond.h" #include "common/Mutex.h" +#include "common/Cond.h" #include "journal/Future.h" #include "journal/JournalMetadataListener.h" #include "journal/ReplayEntry.h" @@ -166,6 +167,10 @@ public: int is_resync_requested(bool *do_resync); + inline ContextWQ *get_work_queue() { + return m_work_queue; + } + private: ImageCtxT &m_image_ctx; diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index b6117918be3..a7cadcfbc7c 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -95,8 +95,8 @@ public: journal::ObjectRecorderPtr create_object(const std::string &oid, uint8_t order, shared_ptr lock) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order, - m_flush_interval, m_flush_bytes, m_flush_age)); + m_ioctx, oid, 0, lock, m_work_queue, *m_timer, m_timer_lock, &m_handler, + order, m_flush_interval, m_flush_bytes, m_flush_age)); m_object_recorders.push_back(object); m_object_recorder_locks.insert(std::make_pair(oid, lock)); m_handler.object_lock = lock; @@ -120,7 +120,6 @@ TEST_F(TestObjectRecorder, Append) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, @@ -128,14 +127,10 @@ TEST_F(TestObjectRecorder, Append) { append_buffers = {append_buffer2}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(2U, object->get_pending_appends()); C_SaferCond cond; - lock->Lock(); append_buffer2.first->flush(&cond); - ASSERT_TRUE(lock->is_locked()); - lock->Unlock(); ASSERT_EQ(0, cond.wait()); ASSERT_EQ(0U, object->get_pending_appends()); } @@ -157,7 +152,6 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, @@ -165,7 +159,6 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) { append_buffers = {append_buffer2}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -190,7 +183,6 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, @@ -198,7 +190,6 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) { append_buffers = {append_buffer2}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -223,14 +214,12 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -255,14 +244,12 @@ TEST_F(TestObjectRecorder, AppendFilledObject) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, payload); append_buffers = {append_buffer2}; lock->Lock(); ASSERT_TRUE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -286,7 +273,6 @@ TEST_F(TestObjectRecorder, Flush) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond1; @@ -315,7 +301,6 @@ TEST_F(TestObjectRecorder, FlushFuture) { append_buffers = {append_buffer}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond; @@ -352,7 +337,6 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { ASSERT_FALSE(append_buffer.first->is_flush_in_progress()); lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); // should automatically flush once its attached to the object C_SaferCond cond; @@ -377,7 +361,6 @@ TEST_F(TestObjectRecorder, Close) { append_buffers = {append_buffer1}; lock->Lock(); ASSERT_FALSE(object->append_unlock(append_buffers)); - ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); lock->Lock(); @@ -421,7 +404,6 @@ TEST_F(TestObjectRecorder, Overflow) { append_buffers = {append_buffer1, append_buffer2}; lock1->Lock(); ASSERT_TRUE(object1->append_unlock(append_buffers)); - ASSERT_FALSE(lock1->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -434,11 +416,7 @@ TEST_F(TestObjectRecorder, Overflow) { lock2->Lock(); ASSERT_FALSE(object2->append_unlock(append_buffers)); - ASSERT_FALSE(lock2->is_locked()); - lock2->Lock(); append_buffer3.first->flush(NULL); - ASSERT_TRUE(lock2->is_locked()); - lock2->Unlock(); bool overflowed = false; { diff --git a/src/test/librbd/test_mock_Journal.cc b/src/test/librbd/test_mock_Journal.cc index 7cc56ad17b4..6622cf349fa 100644 --- a/src/test/librbd/test_mock_Journal.cc +++ b/src/test/librbd/test_mock_Journal.cc @@ -950,11 +950,13 @@ TEST_F(TestMockJournal, EventAndIOCommitOrder) { expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe1); ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal)); + mock_journal.get_work_queue()->drain(); Context *on_journal_safe2; expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe2); ASSERT_EQ(2U, when_append_io_event(mock_image_ctx, mock_journal)); + mock_journal.get_work_queue()->drain(); // commit journal event followed by IO event (standard) on_journal_safe1->complete(0); @@ -998,6 +1000,7 @@ TEST_F(TestMockJournal, AppendWriteEvent) { expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe); ASSERT_EQ(1U, when_append_write_event(mock_image_ctx, mock_journal, 1 << 17)); + mock_journal.get_work_queue()->drain(); on_journal_safe->complete(0); C_SaferCond event_ctx; @@ -1037,6 +1040,7 @@ TEST_F(TestMockJournal, EventCommitError) { expect_wait_future(mock_future, &on_journal_safe); ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, object_request)); + mock_journal.get_work_queue()->drain(); // commit the event in the journal w/o waiting writeback expect_future_committed(mock_journaler); @@ -1076,6 +1080,7 @@ TEST_F(TestMockJournal, EventCommitErrorWithPendingWriteback) { expect_wait_future(mock_future, &on_journal_safe); ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal, object_request)); + mock_journal.get_work_queue()->drain(); expect_future_is_valid(mock_future); C_SaferCond flush_ctx; @@ -1111,6 +1116,7 @@ TEST_F(TestMockJournal, IOCommitError) { expect_append_journaler(mock_journaler); expect_wait_future(mock_future, &on_journal_safe); ASSERT_EQ(1U, when_append_io_event(mock_image_ctx, mock_journal)); + mock_journal.get_work_queue()->drain(); // failed IO remains uncommitted in journal on_journal_safe->complete(0);