From: Ricardo Dias Date: Mon, 25 Jul 2016 16:00:50 +0000 (+0100) Subject: journal: increase concurrency of journal recorder X-Git-Tag: v11.0.1~86^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5c88edd68a1ee7c77f11e4113251fbe5768b8d99;p=ceph.git journal: increase concurrency of journal recorder Signed-off-by: Ricardo Dias --- diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index 4cbe7391ee94..0113bd4a7b92 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -10,6 +10,8 @@ #undef dout_prefix #define dout_prefix *_dout << "JournalRecorder: " << this << " " +using std::shared_ptr; + namespace journal { namespace { @@ -59,8 +61,11 @@ JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, uint8_t splay_width = m_journal_metadata->get_splay_width(); for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { + m_object_locks.push_back(shared_ptr(new Mutex("ObjectRecorder::m_lock::"+ + std::to_string(splay_offset)))); uint64_t object_number = splay_offset + (m_current_set * splay_width); - m_object_ptrs[splay_offset] = create_object_recorder(object_number); + m_object_ptrs[splay_offset] = create_object_recorder(object_number, + m_object_locks[splay_offset]); } m_journal_metadata->add_listener(&m_listener); @@ -77,7 +82,7 @@ JournalRecorder::~JournalRecorder() { Future JournalRecorder::append(uint64_t tag_tid, const bufferlist &payload_bl) { - Mutex::Locker locker(m_lock); + m_lock.Lock(); uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); uint8_t splay_width = m_journal_metadata->get_splay_width(); @@ -90,6 +95,9 @@ Future JournalRecorder::append(uint64_t tag_tid, future->init(m_prev_future); m_prev_future = future; + m_object_locks[splay_offset]->Lock(); + m_lock.Unlock(); + bufferlist entry_bl; ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), entry_bl); @@ -97,11 +105,12 @@ Future JournalRecorder::append(uint64_t tag_tid, AppendBuffers append_buffers; append_buffers.push_back(std::make_pair(future, entry_bl)); - bool object_full = object_ptr->append(append_buffers); + bool object_full = object_ptr->append_unlock(append_buffers); if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; + Mutex::Locker l(m_lock); close_and_advance_object_set(object_ptr->get_object_number() / splay_width); } return Future(future); @@ -117,6 +126,7 @@ void JournalRecorder::flush(Context *on_safe) { it != m_object_ptrs.end(); ++it) { it->second->flush(ctx); } + } // avoid holding the lock in case there is nothing to flush @@ -191,6 +201,8 @@ void JournalRecorder::open_object_set() { << dendl; uint8_t splay_width = m_journal_metadata->get_splay_width(); + + lock_object_recorders(); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; @@ -201,6 +213,7 @@ void JournalRecorder::open_object_set() { create_next_object_recorder(object_recorder); } } + unlock_object_recorders(); } bool JournalRecorder::close_object_set(uint64_t active_set) { @@ -209,6 +222,7 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { // object recorders will invoke overflow handler as they complete // closing the object to ensure correct order of future appends uint8_t splay_width = m_journal_metadata->get_splay_width(); + lock_object_recorders(); for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); it != m_object_ptrs.end(); ++it) { ObjectRecorderPtr object_recorder = it->second; @@ -224,14 +238,15 @@ bool JournalRecorder::close_object_set(uint64_t active_set) { } } } + unlock_object_recorders(); return (m_in_flight_object_closes == 0); } ObjectRecorderPtr JournalRecorder::create_object_recorder( - uint64_t object_number) { + uint64_t object_number, shared_ptr lock) { ObjectRecorderPtr object_recorder(new ObjectRecorder( m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), - object_number, m_journal_metadata->get_timer(), + object_number, lock, m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes, m_flush_age)); @@ -246,8 +261,10 @@ void JournalRecorder::create_next_object_recorder( uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_number % splay_width; + assert(m_object_locks[splay_offset]->is_locked()); + ObjectRecorderPtr new_object_recorder = create_object_recorder( - (m_current_set * splay_width) + splay_offset); + (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); ldout(m_cct, 10) << __func__ << ": " << "old oid=" << object_recorder->get_oid() << ", " @@ -262,7 +279,7 @@ void JournalRecorder::create_next_object_recorder( new_object_recorder->get_object_number()); } - new_object_recorder->append(append_buffers); + new_object_recorder->append(append_buffers, false); m_object_ptrs[splay_offset] = new_object_recorder; } diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h index 6ed2e6316236..acef0e4b1795 100644 --- a/src/journal/JournalRecorder.h +++ b/src/journal/JournalRecorder.h @@ -92,6 +92,7 @@ private: uint32_t m_in_flight_object_closes = 0; uint64_t m_current_set; ObjectRecorderPtrs m_object_ptrs; + std::vector> m_object_locks; FutureImplPtr m_prev_future; @@ -103,13 +104,26 @@ private: void close_and_advance_object_set(uint64_t object_set); - ObjectRecorderPtr create_object_recorder(uint64_t object_number); + ObjectRecorderPtr create_object_recorder(uint64_t object_number, + std::shared_ptr lock); void create_next_object_recorder(ObjectRecorderPtr object_recorder); void handle_update(); void handle_closed(ObjectRecorder *object_recorder); void handle_overflow(ObjectRecorder *object_recorder); + + void lock_object_recorders() { + for (auto& lock : m_object_locks) { + lock->Lock(); + } + } + + void unlock_object_recorders() { + for (auto& lock : m_object_locks) { + lock->Unlock(); + } + } }; } // namespace journal diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 0cf2fd1e8ea0..c12ecff02c6a 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -13,11 +13,12 @@ #define dout_prefix *_dout << "ObjectRecorder: " << this << " " using namespace cls::journal; +using std::shared_ptr; namespace journal { ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, - uint64_t object_number, + uint64_t object_number, shared_ptr lock, SafeTimer &timer, Mutex &timer_lock, Handler *handler, uint8_t order, uint32_t flush_interval, uint64_t flush_bytes, @@ -27,9 +28,8 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, m_handler(handler), m_order(order), m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this), m_append_task(NULL), - m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)), - m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false), - m_object_closed(false), m_in_flight_flushes(false) { + m_lock(lock), m_append_tid(0), m_pending_bytes(0), m_size(0), + m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); assert(m_handler != NULL); @@ -42,14 +42,22 @@ ObjectRecorder::~ObjectRecorder() { assert(m_in_flight_appends.empty()); } -bool ObjectRecorder::append(const AppendBuffers &append_buffers) { +bool ObjectRecorder::append_unlock(const AppendBuffers &append_buffers) { + return append(append_buffers, true); +} + +bool ObjectRecorder::append(const AppendBuffers &append_buffers, bool unlock) { + assert(m_lock->is_locked()); + FutureImplPtr last_flushed_future; bool schedule_append = false; { - Mutex::Locker locker(m_lock); if (m_overflowed) { m_append_buffers.insert(m_append_buffers.end(), append_buffers.begin(), append_buffers.end()); + if (unlock) { + m_lock->Unlock(); + } return false; } @@ -59,10 +67,20 @@ bool ObjectRecorder::append(const AppendBuffers &append_buffers) { last_flushed_future = iter->first; } } + + if (unlock) { + m_lock->Unlock(); + } } if (last_flushed_future) { + if (unlock) { + m_lock->Lock(); + } flush(last_flushed_future); + if (unlock) { + m_lock->Unlock(); + } } else if (schedule_append) { schedule_append_task(); } else { @@ -78,13 +96,13 @@ void ObjectRecorder::flush(Context *on_safe) { cancel_append_task(); Future future; { - Mutex::Locker locker(m_lock); + Mutex::Locker locker(*m_lock); // if currently handling flush notifications, wait so that // we notify in the correct order (since lock is dropped on // callback) if (m_in_flight_flushes) { - m_in_flight_flushes_cond.Wait(m_lock); + m_in_flight_flushes_cond.Wait(*(m_lock.get())); } // attach the flush to the most recent append @@ -110,7 +128,8 @@ void ObjectRecorder::flush(const FutureImplPtr &future) { ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future << dendl; - Mutex::Locker locker(m_lock); + assert(m_lock->is_locked()); + if (future->get_flush_handler().get() != &m_flush_handler) { // if we don't own this future, re-issue the flush so that it hits the // correct journal object owner @@ -142,7 +161,7 @@ void ObjectRecorder::flush(const FutureImplPtr &future) { void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; - Mutex::Locker locker(m_lock); + assert(m_lock->is_locked()); assert(m_in_flight_tids.empty()); assert(m_in_flight_appends.empty()); assert(m_object_closed || m_overflowed); @@ -151,11 +170,12 @@ void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { } bool ObjectRecorder::close() { + assert (m_lock->is_locked()); + ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; cancel_append_task(); - Mutex::Locker locker(m_lock); flush_appends(true); assert(!m_object_closed); @@ -167,7 +187,7 @@ void ObjectRecorder::handle_append_task() { assert(m_timer_lock.is_locked()); m_append_task = NULL; - Mutex::Locker locker(m_lock); + Mutex::Locker locker(*m_lock); flush_appends(true); } @@ -189,7 +209,7 @@ void ObjectRecorder::schedule_append_task() { bool ObjectRecorder::append(const AppendBuffer &append_buffer, bool *schedule_append) { - assert(m_lock.is_locked()); + assert(m_lock->is_locked()); bool flush_requested = false; if (!m_object_closed && !m_overflowed) { @@ -206,7 +226,7 @@ bool ObjectRecorder::append(const AppendBuffer &append_buffer, } bool ObjectRecorder::flush_appends(bool force) { - assert(m_lock.is_locked()); + assert(m_lock->is_locked()); if (m_object_closed || m_overflowed) { return true; } @@ -232,7 +252,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { AppendBuffers append_buffers; { - Mutex::Locker locker(m_lock); + Mutex::Locker locker(*m_lock); auto tid_iter = m_in_flight_tids.find(tid); assert(tid_iter != m_in_flight_tids.end()); m_in_flight_tids.erase(tid_iter); @@ -275,7 +295,7 @@ void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { } // wake up any flush requests that raced with a RADOS callback - Mutex::Locker locker(m_lock); + Mutex::Locker locker(*m_lock); m_in_flight_flushes = false; m_in_flight_flushes_cond.Signal(); } @@ -284,7 +304,7 @@ void ObjectRecorder::append_overflowed(uint64_t tid) { ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed" << dendl; - assert(m_lock.is_locked()); + assert(m_lock->is_locked()); assert(!m_in_flight_appends.empty()); assert(m_in_flight_appends.begin()->first == tid); @@ -308,7 +328,7 @@ void ObjectRecorder::append_overflowed(uint64_t tid) { } void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { - assert(m_lock.is_locked()); + assert(m_lock->is_locked()); assert(!append_buffers->empty()); uint64_t append_tid = m_append_tid++; @@ -340,7 +360,7 @@ void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { } void ObjectRecorder::notify_handler() { - assert(m_lock.is_locked()); + assert(m_lock->is_locked()); for (AppendBuffers::const_iterator it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) { @@ -350,14 +370,14 @@ void ObjectRecorder::notify_handler() { } if (m_object_closed) { - m_lock.Unlock(); + m_lock->Unlock(); m_handler->closed(this); - m_lock.Lock(); + m_lock->Lock(); } else { // TODO need to delay completion until after aio_notify completes - m_lock.Unlock(); + m_lock->Unlock(); m_handler->overflow(this); - m_lock.Lock(); + m_lock->Lock(); } } diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index 53f8cc9ad0db..7d93ca26c6f0 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -37,9 +37,10 @@ public: }; ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, - uint64_t object_number, SafeTimer &timer, Mutex &timer_lock, - Handler *handler, uint8_t order, uint32_t flush_interval, - uint64_t flush_bytes, double flush_age); + uint64_t object_number, std::shared_ptr lock, + SafeTimer &timer, Mutex &timer_lock, Handler *handler, + uint8_t order, uint32_t flush_interval, uint64_t flush_bytes, + double flush_age); ~ObjectRecorder(); inline uint64_t get_object_number() const { @@ -49,14 +50,15 @@ public: return m_oid; } - bool append(const AppendBuffers &append_buffers); + bool append_unlock(const AppendBuffers &append_buffers); + bool append(const AppendBuffers &append_buffers, bool unlock); void flush(Context *on_safe); void flush(const FutureImplPtr &future); void claim_append_buffers(AppendBuffers *append_buffers); bool is_closed() const { - Mutex::Locker locker(m_lock); + assert(m_lock->is_locked()); return (m_object_closed && m_in_flight_appends.empty()); } bool close(); @@ -66,7 +68,7 @@ public: } inline size_t get_pending_appends() const { - Mutex::Locker locker(m_lock); + Mutex::Locker locker(*m_lock); return m_append_buffers.size(); } @@ -129,7 +131,7 @@ private: C_AppendTask *m_append_task; - mutable Mutex m_lock; + mutable std::shared_ptr m_lock; AppendBuffers m_append_buffers; uint64_t m_append_tid; uint32_t m_pending_bytes; diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 180ac46be2ca..2109e1874a2e 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -861,17 +861,19 @@ uint64_t Journal::append_io_events(journal::EventType event_type, Futures futures; uint64_t tid; { - Mutex::Locker locker(m_lock); - assert(m_state == STATE_READY); + { + Mutex::Locker locker(m_lock); + assert(m_state == STATE_READY); - Mutex::Locker event_locker(m_event_lock); - tid = ++m_event_tid; - assert(tid != 0); + tid = ++m_event_tid; + assert(tid != 0); + } for (auto &bl : bufferlists) { assert(bl.length() <= m_max_append_size); futures.push_back(m_journaler->append(m_tag_tid, bl)); } + Mutex::Locker event_locker(m_event_lock); m_events[tid] = Event(futures, requests, offset, length); } diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index de82d0620951..b6117918be3b 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -10,6 +10,8 @@ #include "test/journal/RadosTestFixture.h" #include +using std::shared_ptr; + class TestObjectRecorder : public RadosTestFixture { public: TestObjectRecorder() @@ -21,6 +23,7 @@ public: struct Handler : public journal::ObjectRecorder::Handler { Mutex lock; + shared_ptr object_lock; Cond cond; bool is_closed = false; uint32_t overflows = 0; @@ -36,7 +39,9 @@ public: virtual void overflow(journal::ObjectRecorder *object_recorder) { Mutex::Locker locker(lock); journal::AppendBuffers append_buffers; + object_lock->Lock(); object_recorder->claim_append_buffers(&append_buffers); + object_lock->Unlock(); ++overflows; cond.Signal(); @@ -44,8 +49,10 @@ public: }; typedef std::list ObjectRecorders; + typedef std::map> ObjectRecorderLocksMap; ObjectRecorders m_object_recorders; + ObjectRecorderLocksMap m_object_recorder_locks; uint32_t m_flush_interval; uint64_t m_flush_bytes; @@ -86,11 +93,13 @@ public: } journal::ObjectRecorderPtr create_object(const std::string &oid, - uint8_t order) { + uint8_t order, shared_ptr lock) { journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_handler, order, + m_ioctx, oid, 0, lock, *m_timer, m_timer_lock, &m_handler, order, m_flush_interval, m_flush_bytes, m_flush_age)); m_object_recorders.push_back(object); + m_object_recorder_locks.insert(std::make_pair(oid, lock)); + m_handler.object_lock = lock; return object; } }; @@ -102,23 +111,31 @@ TEST_F(TestObjectRecorder, Append) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(2U, object->get_pending_appends()); C_SaferCond cond; + lock->Lock(); append_buffer2.first->flush(&cond); + ASSERT_TRUE(lock->is_locked()); + lock->Unlock(); ASSERT_EQ(0, cond.wait()); ASSERT_EQ(0U, object->get_pending_appends()); } @@ -131,19 +148,24 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) { ASSERT_EQ(0, init_metadata(metadata)); set_flush_interval(2); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -159,19 +181,24 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) { ASSERT_EQ(0, init_metadata(metadata)); set_flush_bytes(10); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -187,18 +214,23 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { ASSERT_EQ(0, init_metadata(metadata)); set_flush_age(0.1); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -213,19 +245,24 @@ TEST_F(TestObjectRecorder, AppendFilledObject) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object = create_object(oid, 12); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 12, lock); std::string payload(2048, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, payload); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, payload); append_buffers = {append_buffer2}; - ASSERT_TRUE(object->append(append_buffers)); + lock->Lock(); + ASSERT_TRUE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -240,13 +277,16 @@ TEST_F(TestObjectRecorder, Flush) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond1; @@ -266,18 +306,24 @@ TEST_F(TestObjectRecorder, FlushFuture) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond; append_buffer.first->wait(&cond); + lock->Lock(); object->flush(append_buffer.first); + ASSERT_TRUE(lock->is_locked()); + lock->Unlock(); ASSERT_TRUE(append_buffer.first->is_flush_in_progress() || append_buffer.first->is_complete()); ASSERT_EQ(0, cond.wait()); @@ -290,7 +336,8 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); @@ -298,9 +345,14 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { journal::AppendBuffers append_buffers; append_buffers = {append_buffer}; + lock->Lock(); object->flush(append_buffer.first); + ASSERT_TRUE(lock->is_locked()); + lock->Unlock(); ASSERT_FALSE(append_buffer.first->is_flush_in_progress()); - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); // should automatically flush once its attached to the object C_SaferCond cond; @@ -316,16 +368,22 @@ TEST_F(TestObjectRecorder, Close) { ASSERT_EQ(0, init_metadata(metadata)); set_flush_interval(2); - journal::ObjectRecorderPtr object = create_object(oid, 24); + shared_ptr lock(new Mutex("object_recorder_lock")); + journal::ObjectRecorderPtr object = create_object(oid, 24, lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - ASSERT_FALSE(object->append(append_buffers)); + lock->Lock(); + ASSERT_FALSE(object->append_unlock(append_buffers)); + ASSERT_FALSE(lock->is_locked()); ASSERT_EQ(1U, object->get_pending_appends()); + lock->Lock(); ASSERT_FALSE(object->close()); + ASSERT_TRUE(lock->is_locked()); + lock->Unlock(); { Mutex::Locker locker(m_handler.lock); @@ -349,8 +407,10 @@ TEST_F(TestObjectRecorder, Overflow) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - journal::ObjectRecorderPtr object1 = create_object(oid, 12); - journal::ObjectRecorderPtr object2 = create_object(oid, 12); + shared_ptr lock1(new Mutex("object_recorder_lock_1")); + journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1); + shared_ptr lock2(new Mutex("object_recorder_lock_2")); + journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2); std::string payload(2048, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, @@ -359,7 +419,9 @@ TEST_F(TestObjectRecorder, Overflow) { payload); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1, append_buffer2}; - ASSERT_TRUE(object1->append(append_buffers)); + lock1->Lock(); + ASSERT_TRUE(object1->append_unlock(append_buffers)); + ASSERT_FALSE(lock1->is_locked()); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -370,8 +432,13 @@ TEST_F(TestObjectRecorder, Overflow) { payload); append_buffers = {append_buffer3}; - ASSERT_FALSE(object2->append(append_buffers)); + lock2->Lock(); + ASSERT_FALSE(object2->append_unlock(append_buffers)); + ASSERT_FALSE(lock2->is_locked()); + lock2->Lock(); append_buffer3.first->flush(NULL); + ASSERT_TRUE(lock2->is_locked()); + lock2->Unlock(); bool overflowed = false; {