From: Kefu Chai Date: Sun, 7 Jul 2019 04:41:22 +0000 (+0800) Subject: test/journal: s/Mutex/ceph::mutex/ X-Git-Tag: v15.1.0~1971^2~36 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=efee3ae5fbf0dd9ac53bbfbab338f34aa8c4ee65;p=ceph.git test/journal: s/Mutex/ceph::mutex/ as we cannot create a `smart_pointer` due to limitation of `ceph::make_mutex()`, we need to refactor `TestObjectRecorder` to remove its `TearDown()` method, as when `TearDown()` is called, all local variables have been destroyed, including the `ceph::mutex` instances. so we need to introduce a helper of `ObjectRecorderFlusher`, to flush the objects before `mutex` instances are destroyed. to simplify the interfaces, it flushes in its dtor. so its lifecycle should be shorter than those of mutexes. that's why, mutexes are created before `flusher`. Signed-off-by: Kefu Chai --- diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc index 78fadb2b72eb..573253801860 100644 --- a/src/test/journal/RadosTestFixture.cc +++ b/src/test/journal/RadosTestFixture.cc @@ -9,7 +9,8 @@ #include "journal/Settings.h" RadosTestFixture::RadosTestFixture() - : m_timer_lock("m_timer_lock"), m_timer(NULL), m_listener(this) { + : m_timer_lock(ceph::make_mutex("m_timer_lock")), + m_listener(this) { } void RadosTestFixture::SetUpTestCase() { @@ -53,7 +54,7 @@ void RadosTestFixture::TearDown() { } { - Mutex::Locker locker(m_timer_lock); + std::lock_guard locker{m_timer_lock}; m_timer->shutdown(); } delete m_timer; @@ -115,10 +116,9 @@ int RadosTestFixture::init_metadata(journal::JournalMetadataPtr metadata) { } bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) { - Mutex::Locker locker(m_listener.mutex); + std::unique_lock locker{m_listener.mutex}; while (m_listener.updates[metadata.get()] == 0) { - if (m_listener.cond.WaitInterval( - m_listener.mutex, utime_t(10, 0)) != 0) { + if (m_listener.cond.wait_for(locker, 10s) == std::cv_status::timeout) { return false; } } diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h index 68a96b7d5d7f..9600c839bd2b 100644 --- a/src/test/journal/RadosTestFixture.h +++ b/src/test/journal/RadosTestFixture.h @@ -2,7 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "test/librados/test.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Timer.h" #include "journal/JournalMetadata.h" #include "cls/journal/cls_journal_types.h" @@ -38,17 +38,17 @@ public: struct Listener : public journal::JournalMetadataListener { RadosTestFixture *test_fixture; - Mutex mutex; - Cond cond; + ceph::mutex mutex = ceph::make_mutex("mutex"); + ceph::condition_variable cond; std::map updates; Listener(RadosTestFixture *_test_fixture) - : test_fixture(_test_fixture), mutex("mutex") {} + : test_fixture(_test_fixture) {} void handle_update(journal::JournalMetadata *metadata) override { - Mutex::Locker locker(mutex); + std::lock_guard locker{mutex}; ++updates[metadata]; - cond.Signal(); + cond.notify_all(); } }; @@ -65,8 +65,8 @@ public: ContextWQ *m_work_queue = nullptr; - Mutex m_timer_lock; - SafeTimer *m_timer; + ceph::mutex m_timer_lock; + SafeTimer *m_timer = nullptr; Listener m_listener; diff --git a/src/test/journal/mock/MockJournaler.h b/src/test/journal/mock/MockJournaler.h index ab424cd6b092..d4e0f6c2aec1 100644 --- a/src/test/journal/mock/MockJournaler.h +++ b/src/test/journal/mock/MockJournaler.h @@ -13,7 +13,6 @@ #include class Context; -class Mutex; namespace journal { @@ -150,7 +149,7 @@ struct MockJournalerProxy { } template - MockJournalerProxy(WorkQueue *work_queue, Timer *timer, Mutex *timer_lock, + MockJournalerProxy(WorkQueue *work_queue, Timer *timer, ceph::mutex *timer_lock, librados::IoCtx &header_ioctx, const std::string &journal_id, const std::string &client_id, const Settings&, diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc index 14e9d530ded9..e4e127d6ee9e 100644 --- a/src/test/journal/test_FutureImpl.cc +++ b/src/test/journal/test_FutureImpl.cc @@ -3,7 +3,6 @@ #include "journal/FutureImpl.h" #include "common/Cond.h" -#include "common/Mutex.h" #include "gtest/gtest.h" #include "test/journal/RadosTestFixture.h" diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc index ce7c1425f15f..7cecb6fb1ce7 100644 --- a/src/test/journal/test_JournalMetadata.cc +++ b/src/test/journal/test_JournalMetadata.cc @@ -4,7 +4,6 @@ #include "journal/JournalMetadata.h" #include "test/journal/RadosTestFixture.h" #include "common/Cond.h" -#include "common/Mutex.h" #include class TestJournalMetadata : public RadosTestFixture { diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index 0629038ba49a..94490c2357a6 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -6,8 +6,7 @@ #include "journal/JournalMetadata.h" #include "journal/ReplayHandler.h" #include "include/stringify.h" -#include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "gtest/gtest.h" #include "test/journal/RadosTestFixture.h" #include @@ -23,30 +22,30 @@ public: static const uint64_t max_fetch_bytes = T::max_fetch_bytes; struct ReplayHandler : public journal::ReplayHandler { - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::condition_variable cond; bool entries_available; bool complete; int complete_result; ReplayHandler() - : lock("lock"), entries_available(false), complete(false), + : entries_available(false), complete(false), complete_result(0) {} void get() override {} void put() override {} void handle_entries_available() override { - Mutex::Locker locker(lock); + std::lock_guard locker{lock}; entries_available = true; - cond.Signal(); + cond.notify_all(); } void handle_complete(int r) override { - Mutex::Locker locker(lock); + std::lock_guard locker{lock}; complete = true; complete_result = r; - cond.Signal(); + cond.notify_all(); } }; @@ -97,11 +96,11 @@ public: break; } - Mutex::Locker locker(m_replay_hander.lock); + std::unique_lock locker{m_replay_hander.lock}; if (m_replay_hander.entries_available) { m_replay_hander.entries_available = false; - } else if (m_replay_hander.cond.WaitInterval( - m_replay_hander.lock, utime_t(10, 0)) != 0) { + } else if (m_replay_hander.cond.wait_for(locker, 10s) == + std::cv_status::timeout) { break; } } @@ -109,14 +108,14 @@ public: } bool wait_for_complete(journal::JournalPlayer *player) { - Mutex::Locker locker(m_replay_hander.lock); + std::unique_lock locker{m_replay_hander.lock}; while (!m_replay_hander.complete) { journal::Entry entry; uint64_t commit_tid; player->try_pop_front(&entry, &commit_tid); - if (m_replay_hander.cond.WaitInterval( - m_replay_hander.lock, utime_t(10, 0)) != 0) { + if (m_replay_hander.cond.wait_for(locker, 10s) == + std::cv_status::timeout) { return false; } } diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc index c8774cab0852..8e3f3323bba7 100644 --- a/src/test/journal/test_ObjectPlayer.cc +++ b/src/test/journal/test_ObjectPlayer.cc @@ -4,7 +4,6 @@ #include "journal/ObjectPlayer.h" #include "journal/Entry.h" #include "include/stringify.h" -#include "common/Mutex.h" #include "common/Timer.h" #include "gtest/gtest.h" #include "test/librados/test.h" diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc index 3cc8e893cfe0..2df5bf0faaf2 100644 --- a/src/test/journal/test_ObjectRecorder.cc +++ b/src/test/journal/test_ObjectRecorder.cc @@ -3,7 +3,7 @@ #include "journal/ObjectRecorder.h" #include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Timer.h" #include "gtest/gtest.h" #include "test/librados/test.h" @@ -14,71 +14,106 @@ using std::shared_ptr; class TestObjectRecorder : public RadosTestFixture { public: - TestObjectRecorder() - : m_flush_interval(std::numeric_limits::max()), - m_flush_bytes(std::numeric_limits::max()), - m_flush_age(600) - { - } + TestObjectRecorder() = default; struct Handler : public journal::ObjectRecorder::Handler { - Mutex lock; - shared_ptr object_lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("lock"); + ceph::mutex* object_lock = nullptr; + ceph::condition_variable cond; bool is_closed = false; uint32_t overflows = 0; - Handler() : lock("lock") { - } + Handler() = default; void closed(journal::ObjectRecorder *object_recorder) override { - Mutex::Locker locker(lock); + std::lock_guard locker{lock}; is_closed = true; - cond.Signal(); + cond.notify_all(); } void overflow(journal::ObjectRecorder *object_recorder) override { - Mutex::Locker locker(lock); + std::lock_guard locker{lock}; journal::AppendBuffers append_buffers; - object_lock->Lock(); + object_lock->lock(); object_recorder->claim_append_buffers(&append_buffers); - object_lock->Unlock(); + object_lock->unlock(); ++overflows; - cond.Signal(); + cond.notify_all(); } }; - typedef std::list ObjectRecorders; - typedef std::map> ObjectRecorderLocksMap; - - ObjectRecorders m_object_recorders; - ObjectRecorderLocksMap m_object_recorder_locks; - - uint32_t m_flush_interval; - uint64_t m_flush_bytes; - double m_flush_age; - uint64_t m_max_in_flight_appends = 0; - Handler m_handler; - - void TearDown() override { - for (ObjectRecorders::iterator it = m_object_recorders.begin(); - it != m_object_recorders.end(); ++it) { - C_SaferCond cond; - (*it)->flush(&cond); - cond.wait(); + // flush the pending buffers in dtor + class ObjectRecorderFlusher { + public: + ObjectRecorderFlusher(librados::IoCtx& ioctx, + ContextWQ* work_queue) + : m_ioctx{ioctx}, + m_work_queue{work_queue} + {} + ObjectRecorderFlusher(librados::IoCtx& ioctx, + ContextWQ* work_queue, + uint32_t flush_interval, + uint16_t flush_bytes, + double flush_age, + int max_in_flight) + : m_ioctx{ioctx}, + m_work_queue{work_queue}, + m_flush_interval{flush_interval}, + m_flush_bytes{flush_bytes}, + m_flush_age{flush_age}, + m_max_in_flight_appends{max_in_flight < 0 ? + std::numeric_limits::max() : + static_cast(max_in_flight)} + {} + ~ObjectRecorderFlusher() { + for (auto& object_recorder : m_object_recorders) { + C_SaferCond cond; + object_recorder->flush(&cond); + cond.wait(); + } } - m_object_recorders.clear(); - - RadosTestFixture::TearDown(); - } - - inline void set_batch_options(uint32_t flush_interval, uint64_t flush_bytes, - double flush_age, int max_in_flight) { - m_flush_interval = flush_interval; - m_flush_bytes = flush_bytes; - m_flush_age = flush_age; - m_max_in_flight_appends = max_in_flight; - } + journal::ObjectRecorderPtr create_object(const std::string& oid, + uint8_t order, + ceph::mutex* lock) { + journal::ObjectRecorderPtr object(new journal::ObjectRecorder( + m_ioctx, oid, 0, lock, m_work_queue, &m_handler, + order, m_max_in_flight_appends)); + { + std::lock_guard locker{*lock}; + object->set_append_batch_options(m_flush_interval, + m_flush_bytes, + m_flush_age); + } + m_object_recorders.push_back(object); + m_handler.object_lock = lock; + return object; + } + bool wait_for_closed() { + std::unique_lock locker{m_handler.lock}; + return m_handler.cond.wait_for(locker, 10s, + [this] { return m_handler.is_closed; }); + } + bool wait_for_overflow() { + std::unique_lock locker{m_handler.lock}; + if (m_handler.cond.wait_for(locker, 10s, + [this] { return m_handler.overflows > 0; })) { + m_handler.overflows = 0; + return true; + } else { + return false; + } + } + private: + librados::IoCtx& m_ioctx; + ContextWQ *m_work_queue; + uint32_t m_flush_interval = std::numeric_limits::max(); + uint64_t m_flush_bytes = std::numeric_limits::max(); + double m_flush_age = 600; + uint64_t m_max_in_flight_appends = 0; + using ObjectRecorders = std::list; + ObjectRecorders m_object_recorders; + Handler m_handler; + }; journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid, const std::string &payload) { @@ -90,22 +125,6 @@ public: bl.append(payload); return std::make_pair(future, bl); } - - journal::ObjectRecorderPtr create_object(const std::string &oid, - uint8_t order, shared_ptr lock) { - journal::ObjectRecorderPtr object(new journal::ObjectRecorder( - m_ioctx, oid, 0, lock, m_work_queue, &m_handler, order, - m_max_in_flight_appends)); - { - Mutex::Locker locker(*lock); - object->set_append_batch_options(m_flush_interval, m_flush_bytes, - m_flush_age); - } - m_object_recorders.push_back(object); - m_object_recorder_locks.insert(std::make_pair(oid, lock)); - m_handler.object_lock = lock; - return object; - } }; TEST_F(TestObjectRecorder, Append) { @@ -115,25 +134,25 @@ TEST_F(TestObjectRecorder, Append) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(0, 0, 0, 0); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0, 0); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(0U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -149,25 +168,25 @@ TEST_F(TestObjectRecorder, AppendFlushByCount) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(2, 0, 0, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -182,25 +201,25 @@ TEST_F(TestObjectRecorder, AppendFlushByBytes) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(0, 10, 0, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(1U, object->get_pending_appends()); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(0U, object->get_pending_appends()); C_SaferCond cond; @@ -215,24 +234,24 @@ TEST_F(TestObjectRecorder, AppendFlushByAge) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(0, 0, 0.1, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 0, 0.1, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, "payload"); append_buffers = {append_buffer2}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -247,24 +266,25 @@ TEST_F(TestObjectRecorder, AppendFilledObject) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 12, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 12, &lock); std::string payload(2048, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, payload); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124, payload); append_buffers = {append_buffer2}; - lock->Lock(); + lock.lock(); ASSERT_TRUE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); C_SaferCond cond; append_buffer2.first->wait(&cond); @@ -279,17 +299,17 @@ TEST_F(TestObjectRecorder, Flush) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(0, 10, 0, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond1; @@ -309,17 +329,17 @@ TEST_F(TestObjectRecorder, FlushFuture) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(0, 10, 0, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 0, 10, 0, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(1U, object->get_pending_appends()); C_SaferCond cond; @@ -337,8 +357,9 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer = create_append_buffer(234, 123, "payload"); @@ -348,9 +369,9 @@ TEST_F(TestObjectRecorder, FlushDetachedFuture) { object->flush(append_buffer.first); ASSERT_FALSE(append_buffer.first->is_flush_in_progress()); - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); // should automatically flush once its attached to the object C_SaferCond cond; @@ -365,35 +386,26 @@ TEST_F(TestObjectRecorder, Close) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - set_batch_options(2, 0, 0, -1); - shared_ptr lock(new Mutex("object_recorder_lock")); - journal::ObjectRecorderPtr object = create_object(oid, 24, lock); + ceph::mutex lock = ceph::make_mutex("object_recorder_lock"); + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue, 2, 0, 0, -1); + journal::ObjectRecorderPtr object = flusher.create_object(oid, 24, &lock); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, "payload"); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1}; - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->append(std::move(append_buffers))); - lock->Unlock(); + lock.unlock(); ASSERT_EQ(1U, object->get_pending_appends()); - lock->Lock(); + lock.lock(); ASSERT_FALSE(object->close()); - ASSERT_TRUE(lock->is_locked()); - lock->Unlock(); - - { - Mutex::Locker locker(m_handler.lock); - while (!m_handler.is_closed) { - if (m_handler.cond.WaitInterval( - m_handler.lock, utime_t(10, 0)) != 0) { - break; - } - } - } + ASSERT_TRUE(ceph_mutex_is_locked(lock)); + lock.unlock(); + + ASSERT_TRUE(flusher.wait_for_closed()); - ASSERT_TRUE(m_handler.is_closed); ASSERT_EQ(0U, object->get_pending_appends()); } @@ -404,8 +416,11 @@ TEST_F(TestObjectRecorder, Overflow) { journal::JournalMetadataPtr metadata = create_metadata(oid); ASSERT_EQ(0, init_metadata(metadata)); - shared_ptr lock1(new Mutex("object_recorder_lock_1")); - journal::ObjectRecorderPtr object1 = create_object(oid, 12, lock1); + ceph::mutex lock1 = ceph::make_mutex("object_recorder_lock_1"); + ceph::mutex lock2 = ceph::make_mutex("object_recorder_lock_2"); + + ObjectRecorderFlusher flusher(m_ioctx, m_work_queue); + journal::ObjectRecorderPtr object1 = flusher.create_object(oid, 12, &lock1); std::string payload(1 << 11, '1'); journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123, @@ -414,56 +429,26 @@ TEST_F(TestObjectRecorder, Overflow) { payload); journal::AppendBuffers append_buffers; append_buffers = {append_buffer1, append_buffer2}; - lock1->Lock(); + lock1.lock(); ASSERT_TRUE(object1->append(std::move(append_buffers))); - lock1->Unlock(); + lock1.unlock(); C_SaferCond cond; append_buffer2.first->wait(&cond); ASSERT_EQ(0, cond.wait()); ASSERT_EQ(0U, object1->get_pending_appends()); - bool overflowed = false; - { - Mutex::Locker locker(m_handler.lock); - while (m_handler.overflows == 0) { - if (m_handler.cond.WaitInterval( - m_handler.lock, utime_t(10, 0)) != 0) { - break; - } - } - if (m_handler.overflows != 0) { - overflowed = true; - m_handler.overflows = 0; - } - } + ASSERT_TRUE(flusher.wait_for_overflow()); - ASSERT_TRUE(overflowed); - - shared_ptr lock2(new Mutex("object_recorder_lock_2")); - journal::ObjectRecorderPtr object2 = create_object(oid, 12, lock2); + journal::ObjectRecorderPtr object2 = flusher.create_object(oid, 12, &lock2); journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123, payload); append_buffers = {append_buffer3}; - lock2->Lock(); + lock2.lock(); ASSERT_FALSE(object2->append(std::move(append_buffers))); - lock2->Unlock(); + lock2.unlock(); append_buffer3.first->flush(NULL); - overflowed = false; - { - Mutex::Locker locker(m_handler.lock); - while (m_handler.overflows == 0) { - if (m_handler.cond.WaitInterval( - m_handler.lock, utime_t(10, 0)) != 0) { - break; - } - } - if (m_handler.overflows != 0) { - overflowed = true; - } - } - - ASSERT_TRUE(overflowed); + ASSERT_TRUE(flusher.wait_for_overflow()); }