From bf0142c6c9a4ac6d1d4bca2ddf8a044302d811a6 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Sat, 20 Jul 2019 01:13:38 +0800 Subject: [PATCH] tools/rbd_mirror: s/Mutex/ceph::Formatter/ * s/Formatter/ceph::Formatter/ in tools/rbd_mirror/ImageSyncThrottler, as we cannot rely on `using ceph` or `using ceph::Formatter` in some other included header files. Signed-off-by: Kefu Chai --- .../test_mock_SnapshotPurgeRequest.cc | 14 +- .../image_deleter/test_mock_TrashWatcher.cc | 7 +- .../test_mock_BootstrapRequest.cc | 4 +- .../test_mock_CreateImageRequest.cc | 2 +- .../test_mock_PrepareRemoteImageRequest.cc | 2 +- src/test/rbd_mirror/random_write.cc | 19 +-- src/test/rbd_mirror/test_ClusterWatcher.cc | 9 +- src/test/rbd_mirror/test_ImageDeleter.cc | 4 +- src/test/rbd_mirror/test_ImageReplayer.cc | 21 +-- src/test/rbd_mirror/test_ImageSync.cc | 19 ++- src/test/rbd_mirror/test_LeaderWatcher.cc | 17 +- src/test/rbd_mirror/test_PoolWatcher.cc | 17 +- src/test/rbd_mirror/test_mock_ImageMap.cc | 48 +++--- .../rbd_mirror/test_mock_ImageReplayer.cc | 2 +- .../rbd_mirror/test_mock_InstanceReplayer.cc | 12 +- .../rbd_mirror/test_mock_InstanceWatcher.cc | 2 +- .../rbd_mirror/test_mock_LeaderWatcher.cc | 6 +- src/test/rbd_mirror/test_mock_PoolReplayer.cc | 4 +- src/test/rbd_mirror/test_mock_PoolWatcher.cc | 45 +++-- src/tools/rbd_mirror/ClusterWatcher.cc | 6 +- src/tools/rbd_mirror/ClusterWatcher.h | 6 +- src/tools/rbd_mirror/ImageDeleter.cc | 58 +++---- src/tools/rbd_mirror/ImageDeleter.h | 11 +- src/tools/rbd_mirror/ImageMap.cc | 47 ++--- src/tools/rbd_mirror/ImageMap.h | 6 +- src/tools/rbd_mirror/ImageReplayer.cc | 104 ++++++------ src/tools/rbd_mirror/ImageReplayer.h | 20 +-- src/tools/rbd_mirror/ImageSync.cc | 36 ++-- src/tools/rbd_mirror/ImageSync.h | 10 +- src/tools/rbd_mirror/ImageSyncThrottler.cc | 21 +-- src/tools/rbd_mirror/ImageSyncThrottler.h | 6 +- src/tools/rbd_mirror/InstanceReplayer.cc | 46 ++--- src/tools/rbd_mirror/InstanceReplayer.h | 4 +- src/tools/rbd_mirror/InstanceWatcher.cc | 111 ++++++------ src/tools/rbd_mirror/InstanceWatcher.h | 4 +- src/tools/rbd_mirror/Instances.cc | 59 +++---- src/tools/rbd_mirror/Instances.h | 11 +- src/tools/rbd_mirror/LeaderWatcher.cc | 160 ++++++++---------- src/tools/rbd_mirror/LeaderWatcher.h | 12 +- src/tools/rbd_mirror/Mirror.cc | 38 ++--- src/tools/rbd_mirror/Mirror.h | 6 +- src/tools/rbd_mirror/PoolReplayer.cc | 60 +++---- src/tools/rbd_mirror/PoolReplayer.h | 7 +- src/tools/rbd_mirror/PoolWatcher.cc | 42 +++-- src/tools/rbd_mirror/PoolWatcher.h | 6 +- src/tools/rbd_mirror/ServiceDaemon.cc | 23 ++- src/tools/rbd_mirror/ServiceDaemon.h | 4 +- src/tools/rbd_mirror/Threads.cc | 4 +- src/tools/rbd_mirror/Threads.h | 5 +- .../image_deleter/SnapshotPurgeRequest.cc | 28 +-- .../image_deleter/TrashMoveRequest.cc | 8 +- .../rbd_mirror/image_deleter/TrashWatcher.cc | 41 +++-- .../rbd_mirror/image_deleter/TrashWatcher.h | 4 +- src/tools/rbd_mirror/image_deleter/Types.h | 2 +- src/tools/rbd_mirror/image_map/Policy.cc | 32 ++-- src/tools/rbd_mirror/image_map/Policy.h | 7 +- .../image_replayer/BootstrapRequest.cc | 26 +-- .../image_replayer/BootstrapRequest.h | 5 +- .../image_replayer/CreateImageRequest.cc | 8 +- .../image_replayer/EventPreprocessor.cc | 2 +- .../image_replayer/OpenLocalImageRequest.cc | 12 +- .../image_replayer/ReplayStatusFormatter.cc | 8 +- .../image_replayer/ReplayStatusFormatter.h | 4 +- .../image_sync/SyncPointPruneRequest.cc | 2 +- 64 files changed, 674 insertions(+), 702 deletions(-) diff --git a/src/test/rbd_mirror/image_deleter/test_mock_SnapshotPurgeRequest.cc b/src/test/rbd_mirror/image_deleter/test_mock_SnapshotPurgeRequest.cc index f0a44812715..dde78029611 100644 --- a/src/test/rbd_mirror/image_deleter/test_mock_SnapshotPurgeRequest.cc +++ b/src/test/rbd_mirror/image_deleter/test_mock_SnapshotPurgeRequest.cc @@ -158,7 +158,7 @@ public: TEST_F(TestMockImageDeleterSnapshotPurgeRequest, Success) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_PROTECTED, 0, {}); m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap2", 2, @@ -206,7 +206,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, Success) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, OpenError) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_UNPROTECTED, 0, {}); @@ -230,7 +230,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, OpenError) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, AcquireLockError) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_UNPROTECTED, 0, {}); @@ -256,7 +256,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, AcquireLockError) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapUnprotectBusy) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_PROTECTED, 0, {}); } @@ -290,7 +290,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapUnprotectBusy) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapUnprotectError) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_PROTECTED, 0, {}); } @@ -324,7 +324,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapUnprotectError) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapRemoveError) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_UNPROTECTED, 0, {}); @@ -359,7 +359,7 @@ TEST_F(TestMockImageDeleterSnapshotPurgeRequest, SnapRemoveError) { TEST_F(TestMockImageDeleterSnapshotPurgeRequest, CloseError) { { - RWLock::WLocker image_locker(m_local_image_ctx->image_lock); + std::unique_lock image_locker{m_local_image_ctx->image_lock}; m_local_image_ctx->add_snap(cls::rbd::UserSnapshotNamespace{}, "snap1", 1, 0, {}, RBD_PROTECTION_STATUS_UNPROTECTED, 0, {}); diff --git a/src/test/rbd_mirror/image_deleter/test_mock_TrashWatcher.cc b/src/test/rbd_mirror/image_deleter/test_mock_TrashWatcher.cc index df78edd2db3..d1a707ed803 100644 --- a/src/test/rbd_mirror/image_deleter/test_mock_TrashWatcher.cc +++ b/src/test/rbd_mirror/image_deleter/test_mock_TrashWatcher.cc @@ -81,7 +81,7 @@ namespace mirror { template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; + ceph::mutex &timer_lock; MockContextWQ *work_queue; @@ -122,7 +122,8 @@ public: typedef librbd::TrashWatcher LibrbdTrashWatcher; struct MockListener : TrashListener { - MOCK_METHOD2(handle_trash_image, void(const std::string&, const utime_t&)); + MOCK_METHOD2(handle_trash_image, void(const std::string&, + const ceph::real_clock::time_point&)); }; void expect_work_queue(MockThreads &mock_threads) { @@ -180,7 +181,7 @@ public: .WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) { auto wrapped_ctx = new FunctionContext([this, ctx](int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; ctx->complete(r); }); m_threads->work_queue->queue(wrapped_ctx, 0); diff --git a/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc b/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc index 6c7c30a2cfd..341c6373f61 100644 --- a/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc +++ b/src/test/rbd_mirror/image_replayer/test_mock_BootstrapRequest.cc @@ -57,7 +57,7 @@ class ProgressContext; template <> struct Threads { - Mutex &timer_lock; + ceph::mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; @@ -75,7 +75,7 @@ struct ImageSync { static ImageSync* create( librbd::MockTestImageCtx *local_image_ctx, librbd::MockTestImageCtx *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, + SafeTimer *timer, ceph::mutex *timer_lock, const std::string &mirror_uuid, ::journal::MockJournaler *journaler, librbd::journal::MirrorPeerClientMeta *client_meta, ContextWQ *work_queue, InstanceWatcher *instance_watcher, diff --git a/src/test/rbd_mirror/image_replayer/test_mock_CreateImageRequest.cc b/src/test/rbd_mirror/image_replayer/test_mock_CreateImageRequest.cc index 8e13e8b37ef..58dfa80967e 100644 --- a/src/test/rbd_mirror/image_replayer/test_mock_CreateImageRequest.cc +++ b/src/test/rbd_mirror/image_replayer/test_mock_CreateImageRequest.cc @@ -122,7 +122,7 @@ namespace mirror { template <> struct Threads { - Mutex &timer_lock; + ceph::mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; diff --git a/src/test/rbd_mirror/image_replayer/test_mock_PrepareRemoteImageRequest.cc b/src/test/rbd_mirror/image_replayer/test_mock_PrepareRemoteImageRequest.cc index 4e74df82e10..9814b9bfb53 100644 --- a/src/test/rbd_mirror/image_replayer/test_mock_PrepareRemoteImageRequest.cc +++ b/src/test/rbd_mirror/image_replayer/test_mock_PrepareRemoteImageRequest.cc @@ -38,7 +38,7 @@ namespace mirror { template <> struct Threads { - Mutex &timer_lock; + ceph::mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; diff --git a/src/test/rbd_mirror/random_write.cc b/src/test/rbd_mirror/random_write.cc index 34145206da6..16693d2ae41 100644 --- a/src/test/rbd_mirror/random_write.cc +++ b/src/test/rbd_mirror/random_write.cc @@ -42,20 +42,19 @@ void rbd_bencher_completion(void *c, void *pc); struct rbd_bencher { librbd::Image *image; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("rbd_bencher::lock"); + ceph::condition_variable cond; int in_flight; explicit rbd_bencher(librbd::Image *i) : image(i), - lock("rbd_bencher::lock"), in_flight(0) { } bool start_write(int max, uint64_t off, uint64_t len, bufferlist& bl, int op_flags) { { - Mutex::Locker l(lock); + std::lock_guard l{lock}; if (in_flight >= max) return false; in_flight++; @@ -68,11 +67,9 @@ struct rbd_bencher { } void wait_for(int max) { - Mutex::Locker l(lock); + std::unique_lock l{lock}; while (in_flight > max) { - utime_t dur; - dur.set_from_double(.2); - cond.WaitInterval(lock, dur); + cond.wait_for(l, 200ms); } } @@ -87,10 +84,10 @@ void rbd_bencher_completion(void *vc, void *pc) { cout << "write error: " << cpp_strerror(ret) << std::endl; exit(ret < 0 ? -ret : ret); } - b->lock.Lock(); + b->lock.lock(); b->in_flight--; - b->cond.Signal(); - b->lock.Unlock(); + b->cond.notify_all(); + b->lock.unlock(); c->release(); } diff --git a/src/test/rbd_mirror/test_ClusterWatcher.cc b/src/test/rbd_mirror/test_ClusterWatcher.cc index 05ca2c1080f..7ac01d5f71e 100644 --- a/src/test/rbd_mirror/test_ClusterWatcher.cc +++ b/src/test/rbd_mirror/test_ClusterWatcher.cc @@ -3,7 +3,7 @@ #include "include/rados/librados.hpp" #include "common/Cond.h" #include "common/errno.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "librbd/internal.h" #include "librbd/api/Mirror.h" #include "tools/rbd_mirror/ClusterWatcher.h" @@ -32,8 +32,7 @@ void register_test_cluster_watcher() { class TestClusterWatcher : public ::rbd::mirror::TestFixture { public: - TestClusterWatcher() : m_lock("TestClusterWatcherLock") - { + TestClusterWatcher() { m_cluster = std::make_shared(); EXPECT_EQ("", connect_cluster_pp(*m_cluster)); } @@ -162,12 +161,12 @@ public: void check_peers() { m_cluster_watcher->refresh_pools(); - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; ASSERT_EQ(m_pool_peers, m_cluster_watcher->get_pool_peers()); } RadosRef m_cluster; - Mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("TestClusterWatcherLock"); unique_ptr> m_service_daemon; unique_ptr m_cluster_watcher; diff --git a/src/test/rbd_mirror/test_ImageDeleter.cc b/src/test/rbd_mirror/test_ImageDeleter.cc index bdb7c833ae7..9772094b256 100644 --- a/src/test/rbd_mirror/test_ImageDeleter.cc +++ b/src/test/rbd_mirror/test_ImageDeleter.cc @@ -158,7 +158,7 @@ public: false); EXPECT_EQ(0, ictx->state->open(0)); { - RWLock::WLocker image_locker(ictx->image_lock); + std::unique_lock image_locker{ictx->image_lock}; ictx->set_journal_policy(new librbd::journal::DisabledPolicy()); } @@ -178,7 +178,7 @@ public: false); EXPECT_EQ(0, ictx->state->open(0)); { - RWLock::WLocker image_locker(ictx->image_lock); + std::unique_lock image_locker{ictx->image_lock}; ictx->set_journal_policy(new librbd::journal::DisabledPolicy()); } diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc index 4911d5e6970..f48ac29436f 100644 --- a/src/test/rbd_mirror/test_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_ImageReplayer.cc @@ -56,12 +56,12 @@ public: struct C_WatchCtx : public librados::WatchCtx2 { TestImageReplayer *test; std::string oid; - Mutex lock; - Cond cond; + ceph::mutex lock = ceph::make_mutex("C_WatchCtx::lock"); + ceph::condition_variable cond; bool notified; C_WatchCtx(TestImageReplayer *test, const std::string &oid) - : test(test), oid(oid), lock("C_WatchCtx::lock"), notified(false) { + : test(test), oid(oid), notified(false) { } void handle_notify(uint64_t notify_id, uint64_t cookie, @@ -69,9 +69,9 @@ public: bufferlist bl; test->m_remote_ioctx.notify_ack(oid, notify_id, cookie, bl); - Mutex::Locker locker(lock); + std::lock_guard locker{lock}; notified = true; - cond.Signal(); + cond.notify_all(); } void handle_error(uint64_t cookie, int err) override { @@ -284,10 +284,11 @@ public: return false; } - Mutex::Locker locker(m_watch_ctx->lock); + std::unique_lock locker{m_watch_ctx->lock}; while (!m_watch_ctx->notified) { - if (m_watch_ctx->cond.WaitInterval(m_watch_ctx->lock, - utime_t(seconds, 0)) != 0) { + if (m_watch_ctx->cond.wait_for(locker, + std::chrono::seconds(seconds)) == + std::cv_status::timeout) { return false; } } @@ -805,7 +806,7 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_SingleEpoch) { // race failed op shut down with new ops open_remote_image(&ictx); for (uint64_t i = 0; i < 10; ++i) { - RWLock::RLocker owner_locker(ictx->owner_lock); + std::shared_lock owner_locker{ictx->owner_lock}; C_SaferCond request_lock; ictx->exclusive_lock->acquire_lock(&request_lock); ASSERT_EQ(0, request_lock.wait()); @@ -859,7 +860,7 @@ TEST_F(TestImageReplayer, MultipleReplayFailures_MultiEpoch) { // race failed op shut down with new tag flush open_remote_image(&ictx); { - RWLock::RLocker owner_locker(ictx->owner_lock); + std::shared_lock owner_locker{ictx->owner_lock}; C_SaferCond request_lock; ictx->exclusive_lock->acquire_lock(&request_lock); ASSERT_EQ(0, request_lock.wait()); diff --git a/src/test/rbd_mirror/test_ImageSync.cc b/src/test/rbd_mirror/test_ImageSync.cc index 49fba242316..e46005c3a77 100644 --- a/src/test/rbd_mirror/test_ImageSync.cc +++ b/src/test/rbd_mirror/test_ImageSync.cc @@ -4,6 +4,7 @@ #include "test/rbd_mirror/test_fixture.h" #include "include/stringify.h" #include "include/rbd/librbd.hpp" +#include "common/Cond.h" #include "journal/Journaler.h" #include "journal/Settings.h" #include "librbd/ExclusiveLock.h" @@ -58,7 +59,7 @@ void scribble(librbd::ImageCtx *image_ctx, int num_ops, uint64_t max_size) } } - RWLock::RLocker owner_locker(image_ctx->owner_lock); + std::shared_lock owner_locker{image_ctx->owner_lock}; ASSERT_EQ(0, flush(image_ctx)); } @@ -104,7 +105,7 @@ public: C_SaferCond ctx; { - RWLock::RLocker owner_locker((*image_ctx)->owner_lock); + std::shared_lock owner_locker{(*image_ctx)->owner_lock}; (*image_ctx)->exclusive_lock->try_acquire_lock(&ctx); } ASSERT_EQ(0, ctx.wait()); @@ -178,7 +179,7 @@ TEST_F(TestImageSync, Resize) { std::move(bl), 0)); { - RWLock::RLocker owner_locker(m_remote_image_ctx->owner_lock); + std::shared_lock owner_locker{m_remote_image_ctx->owner_lock}; ASSERT_EQ(0, flush(m_remote_image_ctx)); } @@ -220,7 +221,7 @@ TEST_F(TestImageSync, Discard) { std::move(bl), 0)); { - RWLock::RLocker owner_locker(m_remote_image_ctx->owner_lock); + std::shared_lock owner_locker{m_remote_image_ctx->owner_lock}; ASSERT_EQ(0, flush(m_remote_image_ctx)); } @@ -230,7 +231,7 @@ TEST_F(TestImageSync, Discard) { m_remote_image_ctx->io_work_queue->discard( off + 1, len - 2, m_remote_image_ctx->discard_granularity_bytes)); { - RWLock::RLocker owner_locker(m_remote_image_ctx->owner_lock); + std::shared_lock owner_locker{m_remote_image_ctx->owner_lock}; ASSERT_EQ(0, flush(m_remote_image_ctx)); } @@ -289,7 +290,7 @@ TEST_F(TestImageSync, SnapshotStress) { for (auto &snap_name : snap_names) { uint64_t remote_snap_id; { - RWLock::RLocker remote_image_locker(m_remote_image_ctx->image_lock); + std::shared_lock remote_image_locker{m_remote_image_ctx->image_lock}; remote_snap_id = m_remote_image_ctx->get_snap_id( cls::rbd::UserSnapshotNamespace{}, snap_name); } @@ -300,14 +301,14 @@ TEST_F(TestImageSync, SnapshotStress) { m_remote_image_ctx->state->snap_set(remote_snap_id, &ctx); ASSERT_EQ(0, ctx.wait()); - RWLock::RLocker remote_image_locker(m_remote_image_ctx->image_lock); + std::shared_lock remote_image_locker{m_remote_image_ctx->image_lock}; remote_size = m_remote_image_ctx->get_image_size( m_remote_image_ctx->snap_id); } uint64_t local_snap_id; { - RWLock::RLocker image_locker(m_local_image_ctx->image_lock); + std::shared_lock image_locker{m_local_image_ctx->image_lock}; local_snap_id = m_local_image_ctx->get_snap_id( cls::rbd::UserSnapshotNamespace{}, snap_name); } @@ -318,7 +319,7 @@ TEST_F(TestImageSync, SnapshotStress) { m_local_image_ctx->state->snap_set(local_snap_id, &ctx); ASSERT_EQ(0, ctx.wait()); - RWLock::RLocker image_locker(m_local_image_ctx->image_lock); + std::shared_lock image_locker{m_local_image_ctx->image_lock}; local_size = m_local_image_ctx->get_image_size( m_local_image_ctx->snap_id); bool flags_set; diff --git a/src/test/rbd_mirror/test_LeaderWatcher.cc b/src/test/rbd_mirror/test_LeaderWatcher.cc index 8a5cd89078c..ac8f1bb0af9 100644 --- a/src/test/rbd_mirror/test_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_LeaderWatcher.cc @@ -25,33 +25,34 @@ public: class Listener : public rbd::mirror::leader_watcher::Listener { public: Listener() - : m_test_lock(unique_lock_name("LeaderWatcher::m_test_lock", this)) { + : m_test_lock(ceph::make_mutex( + unique_lock_name("LeaderWatcher::m_test_lock", this))) { } void on_acquire(int r, Context *ctx) { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; m_on_acquire_r = r; m_on_acquire = ctx; } void on_release(int r, Context *ctx) { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; m_on_release_r = r; m_on_release = ctx; } int acquire_count() const { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; return m_acquire_count; } int release_count() const { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; return m_release_count; } void post_acquire_handler(Context *on_finish) override { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; m_acquire_count++; on_finish->complete(m_on_acquire_r); m_on_acquire_r = 0; @@ -62,7 +63,7 @@ public: } void pre_release_handler(Context *on_finish) override { - Mutex::Locker locker(m_test_lock); + std::lock_guard locker{m_test_lock}; m_release_count++; on_finish->complete(m_on_release_r); m_on_release_r = 0; @@ -81,7 +82,7 @@ public: } private: - mutable Mutex m_test_lock; + mutable ceph::mutex m_test_lock; int m_acquire_count = 0; int m_release_count = 0; int m_on_acquire_r = 0; diff --git a/src/test/rbd_mirror/test_PoolWatcher.cc b/src/test/rbd_mirror/test_PoolWatcher.cc index 108dc355684..482e80e0d19 100644 --- a/src/test/rbd_mirror/test_PoolWatcher.cc +++ b/src/test/rbd_mirror/test_PoolWatcher.cc @@ -16,7 +16,7 @@ #include "librbd/api/Mirror.h" #include "common/Cond.h" #include "common/errno.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "tools/rbd_mirror/PoolWatcher.h" #include "tools/rbd_mirror/Threads.h" #include "tools/rbd_mirror/Types.h" @@ -46,7 +46,7 @@ class TestPoolWatcher : public ::rbd::mirror::TestFixture { public: TestPoolWatcher() - : m_lock("TestPoolWatcherLock"), m_pool_watcher_listener(this), + : m_pool_watcher_listener(this), m_image_number(0), m_snap_number(0) { m_cluster = std::make_shared(); @@ -70,7 +70,7 @@ public: struct PoolWatcherListener : public rbd::mirror::pool_watcher::Listener { TestPoolWatcher *test; - Cond cond; + ceph::condition_variable cond; ImageIds image_ids; explicit PoolWatcherListener(TestPoolWatcher *test) : test(test) { @@ -79,12 +79,12 @@ public: void handle_update(const std::string &mirror_uuid, ImageIds &&added_image_ids, ImageIds &&removed_image_ids) override { - Mutex::Locker locker(test->m_lock); + std::lock_guard locker{test->m_lock}; for (auto &image_id : removed_image_ids) { image_ids.erase(image_id); } image_ids.insert(added_image_ids.begin(), added_image_ids.end()); - cond.Signal(); + cond.notify_all(); } }; @@ -204,10 +204,9 @@ public: } void check_images() { - Mutex::Locker l(m_lock); + std::unique_lock l{m_lock}; while (m_mirrored_images != m_pool_watcher_listener.image_ids) { - if (m_pool_watcher_listener.cond.WaitInterval( - m_lock, utime_t(10, 0)) != 0) { + if (m_pool_watcher_listener.cond.wait_for(l, 10s) == std::cv_status::timeout) { break; } } @@ -215,7 +214,7 @@ public: ASSERT_EQ(m_mirrored_images, m_pool_watcher_listener.image_ids); } - Mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("TestPoolWatcherLock"); RadosRef m_cluster; PoolWatcherListener m_pool_watcher_listener; unique_ptr > m_pool_watcher; diff --git a/src/test/rbd_mirror/test_mock_ImageMap.cc b/src/test/rbd_mirror/test_mock_ImageMap.cc index e223b8b9dc6..4dfd669259a 100644 --- a/src/test/rbd_mirror/test_mock_ImageMap.cc +++ b/src/test/rbd_mirror/test_mock_ImageMap.cc @@ -34,7 +34,7 @@ namespace mirror { template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; + ceph::mutex &timer_lock; MockContextWQ *work_queue; @@ -160,11 +160,7 @@ public: } }; - TestMockImageMap() - : m_lock("TestMockImageMap::m_lock"), - m_notify_update_count(0), - m_map_update_count(0) { - } + TestMockImageMap() = default; void SetUp() override { TestFixture::SetUp(); @@ -193,7 +189,7 @@ public: EXPECT_CALL(*mock_threads.timer, add_event_after(_,_)) .WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) { auto wrapped_ctx = new FunctionContext([this, ctx](int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; ctx->complete(r); }); m_threads->work_queue->queue(wrapped_ctx, 0); @@ -208,7 +204,7 @@ public: cct->_conf.set_val("rbd_mirror_image_policy_rebalance_timeout", "0"); auto wrapped_ctx = new FunctionContext([this, ctx](int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; ctx->complete(r); }); m_threads->work_queue->queue(wrapped_ctx, 0); @@ -227,9 +223,9 @@ public: .WillOnce(Invoke([this, &request, r]() { request.on_finish->complete(r); if (r == 0) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ++m_map_update_count; - m_cond.Signal(); + m_cond.notify_all(); } })); } @@ -239,10 +235,10 @@ public: std::map *peer_ack_ctxs) { EXPECT_CALL(mock_listener, mock_acquire_image(global_image_id, _)) .WillOnce(WithArg<1>(Invoke([this, global_image_id, peer_ack_ctxs](Context* ctx) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; peer_ack_ctxs->insert({global_image_id, ctx}); ++m_notify_update_count; - m_cond.Signal(); + m_cond.notify_all(); }))); } @@ -251,10 +247,10 @@ public: std::map *peer_ack_ctxs) { EXPECT_CALL(mock_listener, mock_release_image(global_image_id, _)) .WillOnce(WithArg<1>(Invoke([this, global_image_id, peer_ack_ctxs](Context* ctx) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; peer_ack_ctxs->insert({global_image_id, ctx}); ++m_notify_update_count; - m_cond.Signal(); + m_cond.notify_all(); }))); } @@ -265,10 +261,10 @@ public: EXPECT_CALL(mock_listener, mock_remove_image(mirror_uuid, global_image_id, _)) .WillOnce(WithArg<2>(Invoke([this, global_image_id, peer_ack_ctxs](Context* ctx) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; peer_ack_ctxs->insert({global_image_id, ctx}); ++m_notify_update_count; - m_cond.Signal(); + m_cond.notify_all(); }))); } @@ -278,11 +274,11 @@ public: EXPECT_CALL(mock_listener, mock_release_image(_, _)) .Times(count) .WillRepeatedly(Invoke([this, global_image_ids, peer_ack_ctxs](std::string global_image_id, Context* ctx) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; global_image_ids->emplace(global_image_id); peer_ack_ctxs->insert({global_image_id, ctx}); ++m_notify_update_count; - m_cond.Signal(); + m_cond.notify_all(); })); } @@ -359,9 +355,9 @@ public: } bool wait_for_listener_notify(uint32_t count) { - Mutex::Locker locker(m_lock); + std::unique_lock locker{m_lock}; while (m_notify_update_count < count) { - if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { break; } } @@ -375,9 +371,9 @@ public: } bool wait_for_map_update(uint32_t count) { - Mutex::Locker locker(m_lock); + std::unique_lock locker{m_lock}; while (m_map_update_count < count) { - if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) { + if (m_cond.wait_for(locker, 10s) == std::cv_status::timeout) { break; } } @@ -424,10 +420,10 @@ public: } } - Mutex m_lock; - Cond m_cond; - uint32_t m_notify_update_count; - uint32_t m_map_update_count; + ceph::mutex m_lock = ceph::make_mutex("TestMockImageMap::m_lock"); + ceph::condition_variable m_cond; + uint32_t m_notify_update_count = 0; + uint32_t m_map_update_count = 0; std::string m_local_instance_id; }; diff --git a/src/test/rbd_mirror/test_mock_ImageReplayer.cc b/src/test/rbd_mirror/test_mock_ImageReplayer.cc index e25b88712a5..47127743026 100644 --- a/src/test/rbd_mirror/test_mock_ImageReplayer.cc +++ b/src/test/rbd_mirror/test_mock_ImageReplayer.cc @@ -88,7 +88,7 @@ ImageDeleter* ImageDeleter:: template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; + ceph::mutex &timer_lock; MockContextWQ *work_queue; diff --git a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc index 747644beb5d..149facbcdbc 100644 --- a/src/test/rbd_mirror/test_mock_InstanceReplayer.cc +++ b/src/test/rbd_mirror/test_mock_InstanceReplayer.cc @@ -32,8 +32,8 @@ namespace mirror { template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; - Cond timer_cond; + ceph::mutex &timer_lock; + ceph::condition_variable timer_cond; MockContextWQ *work_queue; @@ -144,14 +144,14 @@ public: EXPECT_CALL(*mock_threads.timer, add_event_after(_, _)) .WillOnce(DoAll( WithArg<1>(Invoke([this, &mock_threads, timer_ctx](Context *ctx) { - ceph_assert(mock_threads.timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(mock_threads.timer_lock)); if (timer_ctx != nullptr) { *timer_ctx = ctx; - mock_threads.timer_cond.SignalOne(); + mock_threads.timer_cond.notify_one(); } else { m_threads->work_queue->queue( new FunctionContext([&mock_threads, ctx](int) { - Mutex::Locker timer_lock(mock_threads.timer_lock); + std::lock_guard timer_lock{mock_threads.timer_lock}; ctx->complete(0); }), 0); } @@ -281,7 +281,7 @@ TEST_F(TestMockInstanceReplayer, RemoveFinishedImage) { ASSERT_TRUE(timer_ctx1 != nullptr); { - Mutex::Locker timer_locker(mock_threads.timer_lock); + std::lock_guard timer_locker{mock_threads.timer_lock}; timer_ctx1->complete(0); } diff --git a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc index 075134d9912..c2e0d41f3f7 100644 --- a/src/test/rbd_mirror/test_mock_InstanceWatcher.cc +++ b/src/test/rbd_mirror/test_mock_InstanceWatcher.cc @@ -64,7 +64,7 @@ namespace mirror { template <> struct Threads { - Mutex &timer_lock; + ceph::mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; diff --git a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc index 981055ec31f..13ad8b4c96b 100644 --- a/src/test/rbd_mirror/test_mock_LeaderWatcher.cc +++ b/src/test/rbd_mirror/test_mock_LeaderWatcher.cc @@ -64,7 +64,7 @@ struct ManagedLock { const std::string& oid, librbd::Watcher *watcher, managed_lock::Mode mode, bool blacklist_on_break_lock, uint32_t blacklist_expire_seconds) - : m_work_queue(work_queue), m_lock("ManagedLock::m_lock") { + : m_work_queue(work_queue) { MockManagedLock::get_instance().construct(); } @@ -74,7 +74,7 @@ struct ManagedLock { ContextWQ *m_work_queue; - mutable Mutex m_lock; + mutable ceph::mutex m_lock = ceph::make_mutex("ManagedLock::m_lock"); bool is_lock_owner() const { return MockManagedLock::get_instance().is_lock_owner(); @@ -182,7 +182,7 @@ namespace mirror { template <> struct Threads { - Mutex &timer_lock; + ceph::mutex &timer_lock; SafeTimer *timer; ContextWQ *work_queue; diff --git a/src/test/rbd_mirror/test_mock_PoolReplayer.cc b/src/test/rbd_mirror/test_mock_PoolReplayer.cc index 6879e036a49..4edf94cc8cb 100644 --- a/src/test/rbd_mirror/test_mock_PoolReplayer.cc +++ b/src/test/rbd_mirror/test_mock_PoolReplayer.cc @@ -250,8 +250,8 @@ struct ServiceDaemon { template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; - Cond timer_cond; + ceph::mutex &timer_lock; + ceph::condition_variable timer_cond; MockContextWQ *work_queue; diff --git a/src/test/rbd_mirror/test_mock_PoolWatcher.cc b/src/test/rbd_mirror/test_mock_PoolWatcher.cc index b4dd66e8d81..ae763a2f54a 100644 --- a/src/test/rbd_mirror/test_mock_PoolWatcher.cc +++ b/src/test/rbd_mirror/test_mock_PoolWatcher.cc @@ -89,7 +89,7 @@ namespace mirror { template <> struct Threads { MockSafeTimer *timer; - Mutex &timer_lock; + ceph::mutex &timer_lock; MockContextWQ *work_queue; @@ -173,8 +173,7 @@ public: } }; - TestMockPoolWatcher() : m_lock("TestMockPoolWatcher::m_lock") { - } + TestMockPoolWatcher() = default; void expect_work_queue(MockThreads &mock_threads) { EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) @@ -217,9 +216,9 @@ public: EXPECT_CALL(mock_listener, mock_handle_update(mirror_uuid, added_image_ids, removed_image_ids)) .WillOnce(WithoutArgs(Invoke([this]() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ++m_update_count; - m_cond.Signal(); + m_cond.notify_all(); }))); } @@ -242,7 +241,7 @@ public: .WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) { auto wrapped_ctx = new FunctionContext([this, ctx](int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; ctx->complete(r); }); m_threads->work_queue->queue(wrapped_ctx, 0); @@ -257,22 +256,18 @@ public: } bool wait_for_update(uint32_t count) { - Mutex::Locker locker(m_lock); - while (m_update_count < count) { - if (m_cond.WaitInterval(m_lock, utime_t(10, 0)) != 0) { - break; - } - } - if (m_update_count < count) { + std::unique_lock locker{m_lock}; + if (m_cond.wait_for(locker, 10s, + [count, this] { return m_update_count >= count; })) { + m_update_count -= count; + return true; + } else { return false; } - - m_update_count -= count; - return true; } - Mutex m_lock; - Cond m_cond; + ceph::mutex m_lock = ceph::make_mutex("TestMockPoolWatcher::m_lock"); + ceph::condition_variable m_cond; uint32_t m_update_count = 0; }; @@ -352,9 +347,9 @@ TEST_F(TestMockPoolWatcher, NotifyDuringRefresh) { &refresh_sent]() { *mock_refresh_images_request.image_ids = image_ids; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; refresh_sent = true; - m_cond.Signal(); + m_cond.notify_all(); })); expect_mirror_uuid_get(m_remote_io_ctx, "remote uuid", 0); @@ -370,10 +365,8 @@ TEST_F(TestMockPoolWatcher, NotifyDuringRefresh) { mock_pool_watcher.init(nullptr); { - Mutex::Locker locker(m_lock); - while (!refresh_sent) { - m_cond.Wait(m_lock); - } + std::unique_lock locker{m_lock}; + m_cond.wait(locker, [&] { return refresh_sent; }); } MirroringWatcher::get_instance().handle_image_updated( @@ -416,10 +409,10 @@ TEST_F(TestMockPoolWatcher, Notify) { Context *notify_ctx = nullptr; EXPECT_CALL(*mock_threads.work_queue, queue(_, _)) .WillOnce(Invoke([this, ¬ify_ctx](Context *ctx, int r) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ASSERT_EQ(nullptr, notify_ctx); notify_ctx = ctx; - m_cond.Signal(); + m_cond.notify_all(); })); expect_listener_handle_update( mock_listener, "remote uuid", diff --git a/src/tools/rbd_mirror/ClusterWatcher.cc b/src/tools/rbd_mirror/ClusterWatcher.cc index 54329de6379..bc2400d81c3 100644 --- a/src/tools/rbd_mirror/ClusterWatcher.cc +++ b/src/tools/rbd_mirror/ClusterWatcher.cc @@ -30,7 +30,7 @@ using librados::IoCtx; namespace rbd { namespace mirror { -ClusterWatcher::ClusterWatcher(RadosRef cluster, Mutex &lock, +ClusterWatcher::ClusterWatcher(RadosRef cluster, ceph::mutex &lock, ServiceDaemon* service_daemon) : m_cluster(cluster), m_lock(lock), m_service_daemon(service_daemon) { @@ -38,7 +38,7 @@ ClusterWatcher::ClusterWatcher(RadosRef cluster, Mutex &lock, const ClusterWatcher::PoolPeers& ClusterWatcher::get_pool_peers() const { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); return m_pool_peers; } @@ -49,7 +49,7 @@ void ClusterWatcher::refresh_pools() PoolPeers pool_peers; read_pool_peers(&pool_peers); - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; m_pool_peers = pool_peers; // TODO: perhaps use a workqueue instead, once we get notifications // about config changes for existing pools diff --git a/src/tools/rbd_mirror/ClusterWatcher.h b/src/tools/rbd_mirror/ClusterWatcher.h index e8430b476ce..a5105e637b6 100644 --- a/src/tools/rbd_mirror/ClusterWatcher.h +++ b/src/tools/rbd_mirror/ClusterWatcher.h @@ -9,7 +9,7 @@ #include #include "common/ceph_context.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/Timer.h" #include "include/rados/librados.hpp" #include "tools/rbd_mirror/Types.h" @@ -37,7 +37,7 @@ public: typedef std::set Peers; typedef std::map PoolPeers; - ClusterWatcher(RadosRef cluster, Mutex &lock, + ClusterWatcher(RadosRef cluster, ceph::mutex &lock, ServiceDaemon* service_daemon); ~ClusterWatcher() = default; ClusterWatcher(const ClusterWatcher&) = delete; @@ -51,7 +51,7 @@ private: typedef std::unordered_map ServicePools; RadosRef m_cluster; - Mutex &m_lock; + ceph::mutex &m_lock; ServiceDaemon* m_service_daemon; ServicePools m_service_pools; diff --git a/src/tools/rbd_mirror/ImageDeleter.cc b/src/tools/rbd_mirror/ImageDeleter.cc index 93ddf5cea1b..6a77955a294 100644 --- a/src/tools/rbd_mirror/ImageDeleter.cc +++ b/src/tools/rbd_mirror/ImageDeleter.cc @@ -128,8 +128,9 @@ ImageDeleter::ImageDeleter(librados::IoCtx& local_io_ctx, ServiceDaemon* service_daemon) : m_local_io_ctx(local_io_ctx), m_threads(threads), m_service_daemon(service_daemon), m_trash_listener(this), - m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageDeleter::m_lock", - this)) { + m_lock(ceph::make_mutex( + librbd::util::unique_lock_name("rbd::mirror::ImageDeleter::m_lock", + this))) { } #undef dout_prefix @@ -192,8 +193,7 @@ void ImageDeleter::shut_down_trash_watcher(Context* on_finish) { template void ImageDeleter::wait_for_ops(Context* on_finish) { { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; m_running = false; cancel_retry_timer(); } @@ -207,7 +207,7 @@ void ImageDeleter::wait_for_ops(Context* on_finish) { template void ImageDeleter::cancel_all_deletions(Context* on_finish) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; // wake up any external state machines waiting on deletions ceph_assert(m_in_flight_delete_queue.empty()); for (auto& queue : {&m_delete_queue, &m_retry_delete_queue}) { @@ -230,7 +230,7 @@ void ImageDeleter::wait_for_deletion(const std::string& image_id, m_threads->work_queue->queue(on_finish, r); }); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto del_info = find_delete_info(image_id); if (!del_info && scheduled_only) { // image not scheduled for deletion @@ -246,7 +246,7 @@ template void ImageDeleter::complete_active_delete(DeleteInfoRef* delete_info, int r) { dout(20) << "info=" << *delete_info << ", r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; notify_on_delete((*delete_info)->image_id, r); delete_info->reset(); } @@ -257,20 +257,19 @@ void ImageDeleter::enqueue_failed_delete(DeleteInfoRef* delete_info, double retry_delay) { dout(20) << "info=" << *delete_info << ", r=" << error_code << dendl; if (error_code == -EBLACKLISTED) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; derr << "blacklisted while deleting local image" << dendl; complete_active_delete(delete_info, error_code); return; } - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; auto& delete_info_ref = *delete_info; notify_on_delete(delete_info_ref->image_id, error_code); delete_info_ref->error_code = error_code; ++delete_info_ref->retries; - delete_info_ref->retry_time = ceph_clock_now(); - delete_info_ref->retry_time += retry_delay; + delete_info_ref->retry_time = (clock_t::now() + + ceph::make_timespan(retry_delay)); m_retry_delete_queue.push_back(delete_info_ref); schedule_retry_timer(); @@ -279,7 +278,7 @@ void ImageDeleter::enqueue_failed_delete(DeleteInfoRef* delete_info, template typename ImageDeleter::DeleteInfoRef ImageDeleter::find_delete_info(const std::string &image_id) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); DeleteQueue delete_queues[] = {m_in_flight_delete_queue, m_retry_delete_queue, m_delete_queue}; @@ -306,7 +305,7 @@ void ImageDeleter::print_status(Formatter *f, stringstream *ss) { f->open_array_section("delete_images_queue"); } - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; for (const auto& image : m_delete_queue) { image->print_status(f, ss); } @@ -331,7 +330,7 @@ template vector ImageDeleter::get_delete_queue_items() { vector items; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; for (const auto& del_info : m_delete_queue) { items.push_back(del_info->image_id); } @@ -343,7 +342,7 @@ template vector > ImageDeleter::get_failed_queue_items() { vector > items; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; for (const auto& del_info : m_retry_delete_queue) { items.push_back(make_pair(del_info->image_id, del_info->error_code)); @@ -360,7 +359,7 @@ void ImageDeleter::remove_images() { uint64_t max_concurrent_deletions = cct->_conf.get_val( "rbd_mirror_concurrent_image_deletions"); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; while (true) { if (!m_running || m_delete_queue.empty() || m_in_flight_delete_queue.size() >= max_concurrent_deletions) { @@ -378,7 +377,7 @@ void ImageDeleter::remove_images() { template void ImageDeleter::remove_image(DeleteInfoRef delete_info) { dout(10) << "info=" << *delete_info << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_in_flight_delete_queue.push_back(delete_info); m_async_op_tracker.start_op(); @@ -400,8 +399,8 @@ void ImageDeleter::handle_remove_image(DeleteInfoRef delete_info, dout(10) << "info=" << *delete_info << ", r=" << r << dendl; { - Mutex::Locker locker(m_lock); - ceph_assert(m_lock.is_locked()); + std::lock_guard locker{m_lock}; + ceph_assert(ceph_mutex_is_locked(m_lock)); auto it = std::find(m_in_flight_delete_queue.begin(), m_in_flight_delete_queue.end(), delete_info); ceph_assert(it != m_in_flight_delete_queue.end()); @@ -430,8 +429,8 @@ void ImageDeleter::handle_remove_image(DeleteInfoRef delete_info, template void ImageDeleter::schedule_retry_timer() { - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (!m_running || m_timer_ctx != nullptr || m_retry_delete_queue.empty()) { return; } @@ -447,7 +446,7 @@ void ImageDeleter::schedule_retry_timer() { template void ImageDeleter::cancel_retry_timer() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); if (m_timer_ctx != nullptr) { bool canceled = m_threads->timer->cancel_event(m_timer_ctx); m_timer_ctx = nullptr; @@ -458,8 +457,8 @@ void ImageDeleter::cancel_retry_timer() { template void ImageDeleter::handle_retry_timer() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - Mutex::Locker locker(m_lock); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + std::lock_guard locker{m_lock}; ceph_assert(m_timer_ctx != nullptr); m_timer_ctx = nullptr; @@ -468,7 +467,7 @@ void ImageDeleter::handle_retry_timer() { ceph_assert(!m_retry_delete_queue.empty()); // move all ready-to-ready items back to main queue - utime_t now = ceph_clock_now(); + auto now = clock_t::now(); while (!m_retry_delete_queue.empty()) { auto &delete_info = m_retry_delete_queue.front(); if (delete_info->retry_time > now) { @@ -493,9 +492,8 @@ void ImageDeleter::handle_retry_timer() { template void ImageDeleter::handle_trash_image(const std::string& image_id, - const utime_t& deferment_end_time) { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + const ImageDeleter::clock_t::time_point& deferment_end_time) { + std::scoped_lock locker{m_threads->timer_lock, m_lock}; auto del_info = find_delete_info(image_id); if (del_info != nullptr) { @@ -505,7 +503,7 @@ void ImageDeleter::handle_trash_image(const std::string& image_id, } dout(10) << "image_id=" << image_id << ", " - << "deferment_end_time=" << deferment_end_time << dendl; + << "deferment_end_time=" << utime_t{deferment_end_time} << dendl; del_info.reset(new DeleteInfo(image_id)); del_info->retry_time = deferment_end_time; diff --git a/src/tools/rbd_mirror/ImageDeleter.h b/src/tools/rbd_mirror/ImageDeleter.h index 8a17eb38c2b..5b8f46a35bc 100644 --- a/src/tools/rbd_mirror/ImageDeleter.h +++ b/src/tools/rbd_mirror/ImageDeleter.h @@ -17,7 +17,7 @@ #include "include/utime.h" #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "tools/rbd_mirror/Types.h" #include "tools/rbd_mirror/image_deleter/Types.h" #include @@ -81,6 +81,7 @@ public: } private: + using clock_t = ceph::real_clock; struct TrashListener : public image_deleter::TrashListener { ImageDeleter *image_deleter; @@ -88,7 +89,7 @@ private: } void handle_trash_image(const std::string& image_id, - const utime_t& deferment_end_time) override { + const ceph::real_clock::time_point& deferment_end_time) override { image_deleter->handle_trash_image(image_id, deferment_end_time); } }; @@ -98,7 +99,7 @@ private: image_deleter::ErrorResult error_result = {}; int error_code = 0; - utime_t retry_time = {}; + clock_t::time_point retry_time; int retries = 0; DeleteInfo(const std::string& image_id) @@ -134,7 +135,7 @@ private: AsyncOpTracker m_async_op_tracker; - Mutex m_lock; + ceph::mutex m_lock; DeleteQueue m_delete_queue; DeleteQueue m_retry_delete_queue; DeleteQueue m_in_flight_delete_queue; @@ -162,7 +163,7 @@ private: void handle_retry_timer(); void handle_trash_image(const std::string& image_id, - const utime_t& deferment_end_time); + const clock_t::time_point& deferment_end_time); void shut_down_trash_watcher(Context* on_finish); void wait_for_ops(Context* on_finish); diff --git a/src/tools/rbd_mirror/ImageMap.cc b/src/tools/rbd_mirror/ImageMap.cc index 58fa5e03040..e1089f79140 100644 --- a/src/tools/rbd_mirror/ImageMap.cc +++ b/src/tools/rbd_mirror/ImageMap.cc @@ -58,7 +58,8 @@ ImageMap::ImageMap(librados::IoCtx &ioctx, Threads *threads, image_map::Listener &listener) : m_ioctx(ioctx), m_threads(threads), m_instance_id(instance_id), m_listener(listener), - m_lock(unique_lock_name("rbd::mirror::ImageMap::m_lock", this)) { + m_lock(ceph::make_mutex( + unique_lock_name("rbd::mirror::ImageMap::m_lock", this))) { } template @@ -74,7 +75,7 @@ void ImageMap::continue_action(const std::set &global_image_ids, dout(20) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_shutting_down) { return; } @@ -148,7 +149,7 @@ template void ImageMap::process_updates() { dout(20) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(m_timer_task == nullptr); Updates map_updates; @@ -157,7 +158,7 @@ void ImageMap::process_updates() { Updates release_updates; // gather updates by advancing the state machine - m_lock.Lock(); + m_lock.lock(); for (auto const &global_image_id : m_global_image_ids) { image_map::ActionType action_type = m_policy->start_action(global_image_id); @@ -188,7 +189,7 @@ void ImageMap::process_updates() { } } m_global_image_ids.clear(); - m_lock.Unlock(); + m_lock.unlock(); // notify listener (acquire, release) and update on-disk map. note // that its safe to process this outside m_lock as we still hold @@ -199,13 +200,13 @@ void ImageMap::process_updates() { template void ImageMap::schedule_update_task() { - Mutex::Locker timer_lock(m_threads->timer_lock); + std::lock_guard timer_lock{m_threads->timer_lock}; schedule_update_task(m_threads->timer_lock); } template -void ImageMap::schedule_update_task(const Mutex &timer_lock) { - ceph_assert(m_threads->timer_lock.is_locked()); +void ImageMap::schedule_update_task(const ceph::mutex &timer_lock) { + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); schedule_rebalance_task(); @@ -214,14 +215,14 @@ void ImageMap::schedule_update_task(const Mutex &timer_lock) { } { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_global_image_ids.empty()) { return; } } m_timer_task = new FunctionContext([this](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_timer_task = nullptr; process_updates(); @@ -240,7 +241,7 @@ void ImageMap::rebalance() { ceph_assert(m_rebalance_task == nullptr); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_async_op_tracker.empty() && m_global_image_ids.empty()){ dout(20) << "starting rebalance" << dendl; @@ -258,7 +259,7 @@ void ImageMap::rebalance() { template void ImageMap::schedule_rebalance_task() { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); CephContext *cct = reinterpret_cast(m_ioctx.cct()); @@ -275,7 +276,7 @@ void ImageMap::schedule_rebalance_task() { } m_rebalance_task = new FunctionContext([this](int _) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_rebalance_task = nullptr; rebalance(); @@ -289,7 +290,7 @@ void ImageMap::schedule_rebalance_task() { template void ImageMap::schedule_action(const std::string &global_image_id) { dout(20) << "global_image_id=" << global_image_id << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_global_image_ids.emplace(global_image_id); } @@ -342,7 +343,7 @@ void ImageMap::handle_load(const std::mapinit(image_mapping); for (auto& pair : image_mapping) { @@ -355,7 +356,7 @@ void ImageMap::handle_load(const std::map void ImageMap::handle_peer_ack_remove(const std::string &global_image_id, int r) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; dout(5) << "global_image_id=" << global_image_id << dendl; if (r < 0) { @@ -376,7 +377,7 @@ void ImageMap::update_images_added( const std::set &global_image_ids) { dout(5) << "peer_uuid=" << peer_uuid << ", " << "global_image_ids=[" << global_image_ids << "]" << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); for (auto const &global_image_id : global_image_ids) { auto result = m_peer_map[global_image_id].insert(peer_uuid); @@ -394,7 +395,7 @@ void ImageMap::update_images_removed( const std::set &global_image_ids) { dout(5) << "peer_uuid=" << peer_uuid << ", " << "global_image_ids=[" << global_image_ids << "]" << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Updates to_remove; for (auto const &global_image_id : global_image_ids) { @@ -434,7 +435,7 @@ template void ImageMap::update_instances_added( const std::vector &instance_ids) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_shutting_down) { return; } @@ -462,7 +463,7 @@ template void ImageMap::update_instances_removed( const std::vector &instance_ids) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_shutting_down) { return; } @@ -495,7 +496,7 @@ void ImageMap::update_images(const std::string &peer_uuid, << removed_global_image_ids.size() << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_shutting_down) { return; } @@ -546,10 +547,10 @@ void ImageMap::shut_down(Context *on_finish) { dout(20) << dendl; { - Mutex::Locker timer_lock(m_threads->timer_lock); + std::lock_guard timer_lock{m_threads->timer_lock}; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_shutting_down); m_shutting_down = true; diff --git a/src/tools/rbd_mirror/ImageMap.h b/src/tools/rbd_mirror/ImageMap.h index 283f55db362..9dd61ee0d6e 100644 --- a/src/tools/rbd_mirror/ImageMap.h +++ b/src/tools/rbd_mirror/ImageMap.h @@ -6,7 +6,7 @@ #include -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "include/Context.h" #include "common/AsyncOpTracker.h" #include "cls/rbd/cls_rbd_types.h" @@ -89,7 +89,7 @@ private: std::unique_ptr m_policy; // our mapping policy Context *m_timer_task = nullptr; - Mutex m_lock; + ceph::mutex m_lock; bool m_shutting_down = false; AsyncOpTracker m_async_op_tracker; @@ -147,7 +147,7 @@ private: void schedule_action(const std::string &global_image_id); void schedule_update_task(); - void schedule_update_task(const Mutex &timer_lock); + void schedule_update_task(const ceph::mutex &timer_lock); void process_updates(); void update_image_mapping(Updates&& map_updates, std::set&& map_removals); diff --git a/src/tools/rbd_mirror/ImageReplayer.cc b/src/tools/rbd_mirror/ImageReplayer.cc index f3940e8dec0..f803834eb19 100644 --- a/src/tools/rbd_mirror/ImageReplayer.cc +++ b/src/tools/rbd_mirror/ImageReplayer.cc @@ -269,8 +269,8 @@ ImageReplayer::ImageReplayer( m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), m_global_image_id(global_image_id), m_local_image_name(global_image_id), - m_lock("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " + - global_image_id), + m_lock(ceph::make_mutex("rbd::mirror::ImageReplayer " + stringify(local_pool_id) + " " + + global_image_id)), m_progress_cxt(this), m_journal_listener(new JournalListener(this)), m_remote_listener(this) @@ -311,7 +311,7 @@ ImageReplayer::~ImageReplayer() template image_replayer::HealthState ImageReplayer::get_health_state() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!m_mirror_image_status_state) { return image_replayer::HEALTH_STATE_OK; @@ -327,7 +327,7 @@ image_replayer::HealthState ImageReplayer::get_health_state() const { template void ImageReplayer::add_peer(const std::string &peer_uuid, librados::IoCtx &io_ctx) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_peers.find({peer_uuid}); if (it == m_peers.end()) { m_peers.insert({peer_uuid, io_ctx}); @@ -338,7 +338,7 @@ template void ImageReplayer::set_state_description(int r, const std::string &desc) { dout(10) << r << " " << desc << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; m_last_r = r; m_state_desc = desc; } @@ -350,7 +350,7 @@ void ImageReplayer::start(Context *on_finish, bool manual) int r = 0; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!is_stopped_()) { derr << "already running" << dendl; r = -EINVAL; @@ -500,7 +500,7 @@ void ImageReplayer::bootstrap() { BootstrapRequest *request = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (on_start_interrupted(m_lock)) { return; } @@ -527,7 +527,7 @@ template void ImageReplayer::handle_bootstrap(int r) { dout(10) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_bootstrap_request->put(); m_bootstrap_request = nullptr; if (m_local_image_ctx) { @@ -556,7 +556,7 @@ void ImageReplayer::handle_bootstrap(int r) { ceph_assert(m_local_journal == nullptr); { - RWLock::RLocker image_locker(m_local_image_ctx->image_lock); + std::shared_lock image_locker{m_local_image_ctx->image_lock}; if (m_local_image_ctx->journal != nullptr) { m_local_journal = m_local_image_ctx->journal; m_local_journal->add_listener(m_journal_listener); @@ -648,7 +648,7 @@ void ImageReplayer::handle_start_replay(int r) { Context *on_finish(nullptr); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_STARTING); m_state = STATE_REPLAYING; std::swap(m_on_start_finish, on_finish); @@ -671,7 +671,7 @@ void ImageReplayer::handle_start_replay(int r) { double poll_seconds = cct->_conf.get_val( "rbd_mirror_journal_poll_age"); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_replay_handler = new ReplayHandler(this); m_remote_journaler->start_live_replay(m_replay_handler, poll_seconds); @@ -691,7 +691,7 @@ void ImageReplayer::on_start_fail(int r, const std::string &desc) dout(10) << "r=" << r << dendl; Context *ctx = new FunctionContext([this, r, desc](int _r) { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_STARTING); m_state = STATE_STOPPING; if (r < 0 && r != -ECANCELED && r != -EREMOTEIO && r != -ENOENT) { @@ -713,13 +713,13 @@ void ImageReplayer::on_start_fail(int r, const std::string &desc) template bool ImageReplayer::on_start_interrupted() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return on_start_interrupted(m_lock); } template -bool ImageReplayer::on_start_interrupted(Mutex& lock) { - ceph_assert(m_lock.is_locked()); +bool ImageReplayer::on_start_interrupted(ceph::mutex& lock) { + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_state == STATE_STARTING); if (!m_stop_requested) { return false; @@ -740,7 +740,7 @@ void ImageReplayer::stop(Context *on_finish, bool manual, int r, bool shut_down_replay = false; bool running = true; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!is_running_()) { running = false; @@ -793,7 +793,7 @@ void ImageReplayer::on_stop_journal_replay(int r, const std::string &desc) dout(10) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_state != STATE_REPLAYING) { // might be invoked multiple times while stopping return; @@ -822,9 +822,9 @@ void ImageReplayer::handle_replay_ready() m_event_replay_tracker.start_op(); - m_lock.Lock(); + m_lock.lock(); bool stopping = (m_state == STATE_STOPPING); - m_lock.Unlock(); + m_lock.unlock(); if (stopping) { dout(10) << "stopping event replay" << dendl; @@ -867,9 +867,9 @@ void ImageReplayer::flush() template void ImageReplayer::flush_local_replay(Context* on_flush) { - m_lock.Lock(); + m_lock.lock(); if (m_state != STATE_REPLAYING) { - m_lock.Unlock(); + m_lock.unlock(); on_flush->complete(0); return; } @@ -880,7 +880,7 @@ void ImageReplayer::flush_local_replay(Context* on_flush) handle_flush_local_replay(on_flush, r); }); m_local_replay->flush(ctx); - m_lock.Unlock(); + m_lock.unlock(); } template @@ -899,9 +899,9 @@ void ImageReplayer::handle_flush_local_replay(Context* on_flush, int r) template void ImageReplayer::flush_commit_position(Context* on_flush) { - m_lock.Lock(); + m_lock.lock(); if (m_state != STATE_REPLAYING) { - m_lock.Unlock(); + m_lock.unlock(); on_flush->complete(0); return; } @@ -912,7 +912,7 @@ void ImageReplayer::flush_commit_position(Context* on_flush) handle_flush_commit_position(on_flush, r); }); m_remote_journaler->flush_commit_position(ctx); - m_lock.Unlock(); + m_lock.unlock(); } template @@ -932,7 +932,7 @@ bool ImageReplayer::on_replay_interrupted() { bool shut_down; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; shut_down = m_stop_requested; } @@ -947,7 +947,7 @@ void ImageReplayer::print_status(Formatter *f, stringstream *ss) { dout(10) << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (f) { f->open_object_section("image_replayer"); @@ -969,7 +969,7 @@ void ImageReplayer::handle_replay_complete(int r, const std::string &error_de } { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_stop_requested = true; } on_stop_journal_replay(r, error_desc); @@ -981,7 +981,7 @@ void ImageReplayer::replay_flush() { bool interrupted = false; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_state != STATE_REPLAYING) { dout(10) << "replay interrupted" << dendl; interrupted = true; @@ -1018,7 +1018,7 @@ void ImageReplayer::handle_replay_flush(int r) { dout(10) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_REPLAY_FLUSHING); m_state = STATE_REPLAYING; } @@ -1096,7 +1096,7 @@ void ImageReplayer::allocate_local_tag() { return; } else { dout(5) << "encountered image demotion: stopping" << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_stop_requested = true; } } @@ -1155,11 +1155,11 @@ void ImageReplayer::preprocess_entry() { dout(20) << "delaying replay by " << delay << " sec" << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; ceph_assert(m_delayed_preprocess_task == nullptr); m_delayed_preprocess_task = new FunctionContext( [this](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_delayed_preprocess_task = nullptr; m_threads->work_queue->queue( create_context_callback::handle_process_entry_ready(int r) { bool update_status = false; { - RWLock::RLocker image_locker(m_local_image_ctx->image_lock); + std::shared_lock image_locker{m_local_image_ctx->image_lock}; if (m_local_image_name != m_local_image_ctx->name) { m_local_image_name = m_local_image_ctx->name; update_status = true; @@ -1270,7 +1270,7 @@ void ImageReplayer::handle_process_entry_safe(const ReplayEntry &replay_entry auto ctx = new FunctionContext( [this, bytes, latency](int r) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_perf_counters) { m_perf_counters->inc(l_rbd_mirror_replay); m_perf_counters->inc(l_rbd_mirror_replay_bytes, bytes); @@ -1286,7 +1286,7 @@ bool ImageReplayer::update_mirror_image_status(bool force, const OptionalState &state) { dout(15) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!start_mirror_image_status_update(force, false)) { return false; } @@ -1299,7 +1299,7 @@ bool ImageReplayer::update_mirror_image_status(bool force, template bool ImageReplayer::start_mirror_image_status_update(bool force, bool restarting) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (!force && !is_stopped_()) { if (!is_running_()) { @@ -1323,7 +1323,7 @@ void ImageReplayer::finish_mirror_image_status_update() { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_in_flight_status_updates > 0); if (--m_in_flight_status_updates > 0) { dout(15) << "waiting on " << m_in_flight_status_updates << " in-flight " @@ -1369,7 +1369,7 @@ void ImageReplayer::send_mirror_status_update(const OptionalState &opt_state) boost::make_optional(false, cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN); image_replayer::BootstrapRequest* bootstrap_request = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; state = m_state; state_desc = m_state_desc; mirror_image_status_state = m_mirror_image_status_state; @@ -1460,7 +1460,7 @@ void ImageReplayer::send_mirror_status_update(const OptionalState &opt_state) } { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_mirror_image_status_state = mirror_image_status_state; } @@ -1489,7 +1489,7 @@ void ImageReplayer::handle_mirror_status_update(int r) { bool running = false; bool started = false; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; bool update_status_requested = false; std::swap(update_status_requested, m_update_status_requested); @@ -1515,8 +1515,8 @@ template void ImageReplayer::reschedule_update_status_task(int new_interval) { bool canceled_task = false; { - Mutex::Locker locker(m_lock); - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard locker{m_lock}; + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_update_status_task) { dout(15) << "canceling existing status update task" << dendl; @@ -1533,7 +1533,7 @@ void ImageReplayer::reschedule_update_status_task(int new_interval) { start_mirror_image_status_update(true, false)) { m_update_status_task = new FunctionContext( [this](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_update_status_task = nullptr; queue_mirror_image_status_update(boost::none); @@ -1557,7 +1557,7 @@ void ImageReplayer::shut_down(int r) { bool canceled_delayed_preprocess_task = false; { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_delayed_preprocess_task != nullptr) { canceled_delayed_preprocess_task = m_threads->timer->cancel_event( m_delayed_preprocess_task); @@ -1573,7 +1573,7 @@ void ImageReplayer::shut_down(int r) { reschedule_update_status_task(-1); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_state == STATE_STOPPING); // if status updates are in-flight, wait for them to complete @@ -1689,7 +1689,7 @@ void ImageReplayer::handle_shut_down(int r) { bool delete_requested = false; bool unregister_asok_hook = false; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; // if status updates are in-flight, wait for them to complete // before proceeding @@ -1743,7 +1743,7 @@ void ImageReplayer::handle_shut_down(int r) { Context *on_start = nullptr; Context *on_stop = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_start, m_on_start_finish); std::swap(on_stop, m_on_stop_finish); m_stop_requested = false; @@ -1769,7 +1769,7 @@ void ImageReplayer::handle_remote_journal_metadata_updated() { cls::journal::Client client; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!is_running_()) { return; } @@ -1818,7 +1818,7 @@ template void ImageReplayer::register_admin_socket_hook() { ImageReplayerAdminSocketHook *asok_hook; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_asok_hook != nullptr) { return; } @@ -1858,7 +1858,7 @@ void ImageReplayer::unregister_admin_socket_hook() { AdminSocketHook *asok_hook = nullptr; PerfCounters *perf_counters = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(asok_hook, m_asok_hook); std::swap(perf_counters, m_perf_counters); } @@ -1872,7 +1872,7 @@ void ImageReplayer::unregister_admin_socket_hook() { template void ImageReplayer::reregister_admin_socket_hook() { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto name = m_local_ioctx->get_pool_name() + "/" + m_local_image_name; if (m_asok_hook != nullptr && m_name == name) { return; diff --git a/src/tools/rbd_mirror/ImageReplayer.h b/src/tools/rbd_mirror/ImageReplayer.h index 2881a9de86d..77f3bb2b84b 100644 --- a/src/tools/rbd_mirror/ImageReplayer.h +++ b/src/tools/rbd_mirror/ImageReplayer.h @@ -5,7 +5,7 @@ #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/WorkQueue.h" #include "include/rados/librados.hpp" #include "cls/journal/cls_journal_types.h" @@ -85,25 +85,25 @@ public: ImageReplayer(const ImageReplayer&) = delete; ImageReplayer& operator=(const ImageReplayer&) = delete; - bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); } - bool is_running() { Mutex::Locker l(m_lock); return is_running_(); } - bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); } + bool is_stopped() { std::lock_guard l{m_lock}; return is_stopped_(); } + bool is_running() { std::lock_guard l{m_lock}; return is_running_(); } + bool is_replaying() { std::lock_guard l{m_lock}; return is_replaying_(); } - std::string get_name() { Mutex::Locker l(m_lock); return m_name; }; + std::string get_name() { std::lock_guard l{m_lock}; return m_name; }; void set_state_description(int r, const std::string &desc); // TODO temporary until policy handles release of image replayers inline bool is_finished() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_finished; } inline void set_finished(bool finished) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_finished = finished; } inline bool is_blacklisted() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return (m_last_r == -EBLACKLISTED); } @@ -204,7 +204,7 @@ protected: virtual void on_start_fail(int r, const std::string &desc); virtual bool on_start_interrupted(); - virtual bool on_start_interrupted(Mutex& lock); + virtual bool on_start_interrupted(ceph::mutex& lock); virtual void on_stop_journal_replay(int r = 0, const std::string &desc = ""); @@ -285,7 +285,7 @@ private: std::string m_local_image_name; std::string m_name; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; State m_state = STATE_STOPPED; std::string m_state_desc; diff --git a/src/tools/rbd_mirror/ImageSync.cc b/src/tools/rbd_mirror/ImageSync.cc index 5677f035628..918bc22190f 100644 --- a/src/tools/rbd_mirror/ImageSync.cc +++ b/src/tools/rbd_mirror/ImageSync.cc @@ -47,7 +47,7 @@ public: template ImageSync::ImageSync(I *local_image_ctx, I *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, + SafeTimer *timer, ceph::mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, ContextWQ *work_queue, @@ -59,7 +59,7 @@ ImageSync::ImageSync(I *local_image_ctx, I *remote_image_ctx, m_journaler(journaler), m_client_meta(client_meta), m_work_queue(work_queue), m_instance_watcher(instance_watcher), m_progress_ctx(progress_ctx), - m_lock(unique_lock_name("ImageSync::m_lock", this)), + m_lock(ceph::make_mutex(unique_lock_name("ImageSync::m_lock", this))), m_update_sync_point_interval(m_local_image_ctx->cct->_conf.template get_val( "rbd_mirror_sync_point_update_age")), m_client_meta_copy(*client_meta) { } @@ -78,7 +78,7 @@ void ImageSync::send() { template void ImageSync::cancel() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; dout(10) << dendl; @@ -99,9 +99,9 @@ void ImageSync::send_notify_sync_request() { dout(10) << dendl; - m_lock.Lock(); + m_lock.lock(); if (m_canceled) { - m_lock.Unlock(); + m_lock.unlock(); BaseRequest::finish(-ECANCELED); return; } @@ -110,18 +110,18 @@ void ImageSync::send_notify_sync_request() { m_work_queue, create_context_callback< ImageSync, &ImageSync::handle_notify_sync_request>(this)); m_instance_watcher->notify_sync_request(m_local_image_ctx->id, ctx); - m_lock.Unlock(); + m_lock.unlock(); } template void ImageSync::handle_notify_sync_request(int r) { dout(10) << ": r=" << r << dendl; - m_lock.Lock(); + m_lock.lock(); if (r == 0 && m_canceled) { r = -ECANCELED; } - m_lock.Unlock(); + m_lock.unlock(); if (r < 0) { BaseRequest::finish(r); @@ -207,7 +207,7 @@ void ImageSync::send_copy_image() { librbd::deep_copy::ObjectNumber object_number; int r = 0; { - RWLock::RLocker image_locker(m_remote_image_ctx->image_lock); + std::shared_lock image_locker{m_remote_image_ctx->image_lock}; ceph_assert(!m_client_meta->sync_points.empty()); auto &sync_point = m_client_meta->sync_points.front(); snap_id_end = m_remote_image_ctx->get_snap_id( @@ -231,9 +231,9 @@ void ImageSync::send_copy_image() { return; } - m_lock.Lock(); + m_lock.lock(); if (m_canceled) { - m_lock.Unlock(); + m_lock.unlock(); finish(-ECANCELED); return; } @@ -248,7 +248,7 @@ void ImageSync::send_copy_image() { false, object_number, m_work_queue, &m_client_meta->snap_seqs, m_image_copy_prog_ctx, ctx); m_image_copy_request->get(); - m_lock.Unlock(); + m_lock.unlock(); update_progress("COPY_IMAGE"); @@ -260,8 +260,7 @@ void ImageSync::handle_copy_image(int r) { dout(10) << ": r=" << r << dendl; { - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{*m_timer_lock, m_lock}; m_image_copy_request->put(); m_image_copy_request = nullptr; delete m_image_copy_prog_ctx; @@ -300,7 +299,7 @@ void ImageSync::handle_copy_image_update_progress(uint64_t object_no, int percent = 100 * object_no / object_count; update_progress("COPY_IMAGE " + stringify(percent) + "%"); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_image_copy_object_no = object_no; m_image_copy_object_count = object_count; @@ -311,7 +310,7 @@ void ImageSync::handle_copy_image_update_progress(uint64_t object_no, template void ImageSync::send_update_sync_point() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_update_sync_ctx = nullptr; @@ -361,14 +360,13 @@ void ImageSync::handle_update_sync_point(int r) { } { - Mutex::Locker timer_locker(*m_timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{*m_timer_lock, m_lock}; m_updating_sync_point = false; if (m_image_copy_request != nullptr) { m_update_sync_ctx = new FunctionContext( [this](int r) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; this->send_update_sync_point(); }); m_timer->add_event_after(m_update_sync_point_interval, diff --git a/src/tools/rbd_mirror/ImageSync.h b/src/tools/rbd_mirror/ImageSync.h index 9e00c1290cf..1fcc14c7eaf 100644 --- a/src/tools/rbd_mirror/ImageSync.h +++ b/src/tools/rbd_mirror/ImageSync.h @@ -8,7 +8,7 @@ #include "librbd/ImageCtx.h" #include "librbd/journal/TypeTraits.h" #include "librbd/journal/Types.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "tools/rbd_mirror/BaseRequest.h" #include #include @@ -36,7 +36,7 @@ public: static ImageSync* create(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, + SafeTimer *timer, ceph::mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, @@ -50,7 +50,7 @@ public: } ImageSync(ImageCtxT *local_image_ctx, ImageCtxT *remote_image_ctx, - SafeTimer *timer, Mutex *timer_lock, const std::string &mirror_uuid, + SafeTimer *timer, ceph::mutex *timer_lock, const std::string &mirror_uuid, Journaler *journaler, MirrorPeerClientMeta *client_meta, ContextWQ *work_queue, InstanceWatcher *instance_watcher, Context *on_finish, ProgressContext *progress_ctx = nullptr); @@ -99,7 +99,7 @@ private: ImageCtxT *m_local_image_ctx; ImageCtxT *m_remote_image_ctx; SafeTimer *m_timer; - Mutex *m_timer_lock; + ceph::mutex *m_timer_lock; std::string m_mirror_uuid; Journaler *m_journaler; MirrorPeerClientMeta *m_client_meta; @@ -109,7 +109,7 @@ private: SnapMap m_snap_map; - Mutex m_lock; + ceph::mutex m_lock; bool m_canceled = false; librbd::DeepCopyRequest *m_image_copy_request = nullptr; diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.cc b/src/tools/rbd_mirror/ImageSyncThrottler.cc index b395a012709..ccbb68790b6 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.cc +++ b/src/tools/rbd_mirror/ImageSyncThrottler.cc @@ -30,8 +30,9 @@ namespace mirror { template ImageSyncThrottler::ImageSyncThrottler(CephContext *cct) : m_cct(cct), - m_lock(librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", - this)), + m_lock(ceph::make_mutex( + librbd::util::unique_lock_name("rbd::mirror::ImageSyncThrottler", + this))), m_max_concurrent_syncs(cct->_conf.get_val( "rbd_mirror_concurrent_image_syncs")) { dout(20) << "max_concurrent_syncs=" << m_max_concurrent_syncs << dendl; @@ -42,7 +43,7 @@ template ImageSyncThrottler::~ImageSyncThrottler() { m_cct->_conf.remove_observer(this); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_inflight_ops.empty()); ceph_assert(m_queue.empty()); } @@ -53,7 +54,7 @@ void ImageSyncThrottler::start_op(const std::string &id, Context *on_start) { int r = 0; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_inflight_ops.count(id) > 0) { dout(20) << "duplicate for already started op " << id << dendl; @@ -86,7 +87,7 @@ bool ImageSyncThrottler::cancel_op(const std::string &id) { Context *on_start = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_queued_ops.find(id); if (it != m_queued_ops.end()) { dout(20) << "canceled queued sync for " << id << dendl; @@ -114,7 +115,7 @@ void ImageSyncThrottler::finish_op(const std::string &id) { Context *on_start = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_inflight_ops.erase(id); @@ -143,7 +144,7 @@ void ImageSyncThrottler::drain(int r) { std::map queued_ops; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(m_queued_ops, queued_ops); m_queue.clear(); m_inflight_ops.clear(); @@ -160,7 +161,7 @@ void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { std::list ops; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_max_concurrent_syncs = max; // Start waiting ops in the case of available free slots @@ -186,10 +187,10 @@ void ImageSyncThrottler::set_max_concurrent_syncs(uint32_t max) { } template -void ImageSyncThrottler::print_status(Formatter *f, std::stringstream *ss) { +void ImageSyncThrottler::print_status(ceph::Formatter *f, std::stringstream *ss) { dout(20) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (f) { f->dump_int("max_parallel_syncs", m_max_concurrent_syncs); diff --git a/src/tools/rbd_mirror/ImageSyncThrottler.h b/src/tools/rbd_mirror/ImageSyncThrottler.h index c0cda61e9a6..797c67cc817 100644 --- a/src/tools/rbd_mirror/ImageSyncThrottler.h +++ b/src/tools/rbd_mirror/ImageSyncThrottler.h @@ -11,7 +11,7 @@ #include #include -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/config_obs.h" class CephContext; @@ -42,11 +42,11 @@ public: void finish_op(const std::string &id); void drain(int r); - void print_status(Formatter *f, std::stringstream *ss); + void print_status(ceph::Formatter *f, std::stringstream *ss); private: CephContext *m_cct; - Mutex m_lock; + ceph::mutex m_lock; uint32_t m_max_concurrent_syncs; std::list m_queue; std::map m_queued_ops; diff --git a/src/tools/rbd_mirror/InstanceReplayer.cc b/src/tools/rbd_mirror/InstanceReplayer.cc index ba40f8b51fc..039efb0c530 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.cc +++ b/src/tools/rbd_mirror/InstanceReplayer.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "include/stringify.h" +#include "common/Cond.h" #include "common/Timer.h" #include "common/debug.h" #include "common/errno.h" @@ -39,7 +40,8 @@ InstanceReplayer::InstanceReplayer( : m_threads(threads), m_service_daemon(service_daemon), m_cache_manager_handler(cache_manager_handler), m_local_rados(local_rados), m_local_mirror_uuid(local_mirror_uuid), m_local_pool_id(local_pool_id), - m_lock("rbd::mirror::InstanceReplayer " + stringify(local_pool_id)) { + m_lock(ceph::make_mutex( + "rbd::mirror::InstanceReplayer " + stringify(local_pool_id))) { } template @@ -63,7 +65,7 @@ void InstanceReplayer::init(Context *on_finish) { Context *ctx = new FunctionContext( [this, on_finish] (int r) { { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; schedule_image_state_check_task(); } on_finish->complete(0); @@ -84,7 +86,7 @@ template void InstanceReplayer::shut_down(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down == nullptr); m_on_shut_down = on_finish; @@ -103,7 +105,7 @@ void InstanceReplayer::add_peer(std::string peer_uuid, librados::IoCtx io_ctx) { dout(10) << peer_uuid << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto result = m_peers.insert(Peer(peer_uuid, io_ctx)).second; ceph_assert(result); } @@ -112,7 +114,7 @@ template void InstanceReplayer::release_all(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; C_Gather *gather_ctx = new C_Gather(g_ceph_context, on_finish); for (auto it = m_image_replayers.begin(); it != m_image_replayers.end(); @@ -135,7 +137,7 @@ void InstanceReplayer::acquire_image(InstanceWatcher *instance_watcher, Context *on_finish) { dout(10) << "global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down == nullptr); @@ -173,7 +175,7 @@ void InstanceReplayer::release_image(const std::string &global_image_id, Context *on_finish) { dout(10) << "global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); @@ -201,7 +203,7 @@ void InstanceReplayer::remove_peer_image(const std::string &global_image_id, dout(10) << "global_image_id=" << global_image_id << ", " << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down == nullptr); auto it = m_image_replayers.find(global_image_id); @@ -224,7 +226,7 @@ void InstanceReplayer::print_status(Formatter *f, stringstream *ss) { return; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; f->open_array_section("image_replayers"); for (auto &kv : m_image_replayers) { @@ -239,7 +241,7 @@ void InstanceReplayer::start() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_manual_stop = false; @@ -254,7 +256,7 @@ void InstanceReplayer::stop() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_manual_stop = true; @@ -269,7 +271,7 @@ void InstanceReplayer::restart() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_manual_stop = false; @@ -284,7 +286,7 @@ void InstanceReplayer::flush() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &kv : m_image_replayers) { auto &image_replayer = kv.second; @@ -295,7 +297,7 @@ void InstanceReplayer::flush() template void InstanceReplayer::start_image_replayer( ImageReplayer *image_replayer) { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); std::string global_image_id = image_replayer->get_global_image_id(); if (!image_replayer->is_stopped()) { @@ -333,7 +335,7 @@ template void InstanceReplayer::start_image_replayers(int r) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_on_shut_down != nullptr) { return; } @@ -395,7 +397,7 @@ void InstanceReplayer::stop_image_replayer(ImageReplayer *image_replayer, << after << " sec (task " << ctx << ")" << dendl; ctx = new FunctionContext( [this, after, ctx] (int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; m_threads->timer->add_event_after(after, ctx); }); m_threads->work_queue->queue(ctx, 0); @@ -418,7 +420,7 @@ void InstanceReplayer::handle_wait_for_ops(int r) { ceph_assert(r == 0); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; stop_image_replayers(); } @@ -426,7 +428,7 @@ template void InstanceReplayer::stop_image_replayers() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_threads->work_queue, create_context_callback, @@ -447,7 +449,7 @@ void InstanceReplayer::handle_stop_image_replayers(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &it : m_image_replayers) { ceph_assert(it.second->is_stopped()); @@ -463,7 +465,7 @@ void InstanceReplayer::handle_stop_image_replayers(int r) { template void InstanceReplayer::cancel_image_state_check_task() { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_image_state_check_task == nullptr) { return; @@ -477,12 +479,12 @@ void InstanceReplayer::cancel_image_state_check_task() { template void InstanceReplayer::schedule_image_state_check_task() { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(m_image_state_check_task == nullptr); m_image_state_check_task = new FunctionContext( [this](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_image_state_check_task = nullptr; schedule_image_state_check_task(); queue_start_image_replayers(); diff --git a/src/tools/rbd_mirror/InstanceReplayer.h b/src/tools/rbd_mirror/InstanceReplayer.h index 80d682c7b76..5aa7fcb0580 100644 --- a/src/tools/rbd_mirror/InstanceReplayer.h +++ b/src/tools/rbd_mirror/InstanceReplayer.h @@ -9,7 +9,7 @@ #include "common/AsyncOpTracker.h" #include "common/Formatter.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "tools/rbd_mirror/Types.h" namespace journal { struct CacheManagerHandler; } @@ -92,7 +92,7 @@ private: std::string m_local_mirror_uuid; int64_t m_local_pool_id; - Mutex m_lock; + ceph::mutex m_lock; AsyncOpTracker m_async_op_tracker; std::map *> m_image_replayers; Peers m_peers; diff --git a/src/tools/rbd_mirror/InstanceWatcher.cc b/src/tools/rbd_mirror/InstanceWatcher.cc index d9e1ba23345..9f8b1d7c3f0 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.cc +++ b/src/tools/rbd_mirror/InstanceWatcher.cc @@ -102,7 +102,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { << ": instance_watcher=" << instance_watcher << ", instance_id=" << instance_id << ", request_id=" << request_id << dendl; - ceph_assert(instance_watcher->m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); if (!send_to_leader) { ceph_assert((!instance_id.empty())); @@ -121,7 +121,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { void send() { dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - ceph_assert(instance_watcher->m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); if (canceling) { dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ @@ -164,7 +164,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { void cancel() { dout(10) << "C_NotifyInstanceRequest: " << this << " " << __func__ << dendl; - ceph_assert(instance_watcher->m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(instance_watcher->m_lock)); canceling = true; instance_watcher->unsuspend_notify_request(this); @@ -213,7 +213,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { if (r == -ETIMEDOUT) { derr << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": resending after timeout" << dendl; - Mutex::Locker locker(instance_watcher->m_lock); + std::lock_guard locker{instance_watcher->m_lock}; send(); return; } else { @@ -223,7 +223,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { if (r == -ESTALE && send_to_leader) { derr << "C_NotifyInstanceRequest: " << this << " " << __func__ << ": resending due to leader change" << dendl; - Mutex::Locker locker(instance_watcher->m_lock); + std::lock_guard locker{instance_watcher->m_lock}; send(); return; } @@ -233,7 +233,7 @@ struct InstanceWatcher::C_NotifyInstanceRequest : public Context { on_finish->complete(r); { - Mutex::Locker locker(instance_watcher->m_lock); + std::lock_guard locker{instance_watcher->m_lock}; auto result = instance_watcher->m_notify_ops.erase( std::make_pair(instance_id, this)); ceph_assert(result > 0); @@ -324,7 +324,8 @@ InstanceWatcher::InstanceWatcher(librados::IoCtx &io_ctx, const std::string &instance_id) : Watcher(io_ctx, work_queue, RBD_MIRROR_INSTANCE_PREFIX + instance_id), m_instance_replayer(instance_replayer), m_instance_id(instance_id), - m_lock(unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this)), + m_lock(ceph::make_mutex( + unique_lock_name("rbd::mirror::InstanceWatcher::m_lock", this))), m_instance_lock(librbd::ManagedLock::create( m_ioctx, m_work_queue, m_oid, this, librbd::managed_lock::EXCLUSIVE, true, m_cct->_conf.get_val("rbd_blacklist_expire_seconds"))) { @@ -352,7 +353,7 @@ template void InstanceWatcher::init(Context *on_finish) { dout(10) << "instance_id=" << m_instance_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -373,7 +374,7 @@ template void InstanceWatcher::shut_down(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -386,7 +387,7 @@ template void InstanceWatcher::remove(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -402,7 +403,7 @@ void InstanceWatcher::notify_image_acquire( dout(10) << "instance_id=" << instance_id << ", global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); @@ -421,7 +422,7 @@ void InstanceWatcher::notify_image_release( dout(10) << "instance_id=" << instance_id << ", global_image_id=" << global_image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); @@ -441,7 +442,7 @@ void InstanceWatcher::notify_peer_image_removed( << "global_image_id=" << global_image_id << ", " << "peer_mirror_uuid=" << peer_mirror_uuid << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); uint64_t request_id = ++m_request_seq; @@ -458,7 +459,7 @@ void InstanceWatcher::notify_sync_request(const std::string &sync_id, Context *on_sync_start) { dout(10) << "sync_id=" << sync_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_inflight_sync_reqs.count(sync_id) == 0); @@ -479,7 +480,7 @@ template bool InstanceWatcher::cancel_sync_request(const std::string &sync_id) { dout(10) << "sync_id=" << sync_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_inflight_sync_reqs.find(sync_id); if (it == m_inflight_sync_reqs.end()) { @@ -502,7 +503,7 @@ void InstanceWatcher::notify_sync_start(const std::string &instance_id, const std::string &sync_id) { dout(10) << "sync_id=" << sync_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; uint64_t request_id = ++m_request_seq; @@ -512,7 +513,7 @@ void InstanceWatcher::notify_sync_start(const std::string &instance_id, auto ctx = new FunctionContext( [this, sync_id] (int r) { dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r != -ESTALE && m_image_sync_throttler != nullptr) { m_image_sync_throttler->finish_op(sync_id); } @@ -524,15 +525,15 @@ void InstanceWatcher::notify_sync_start(const std::string &instance_id, template void InstanceWatcher::notify_sync_complete(const std::string &sync_id) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; notify_sync_complete(m_lock, sync_id); } template -void InstanceWatcher::notify_sync_complete(const Mutex&, +void InstanceWatcher::notify_sync_complete(const ceph::mutex&, const std::string &sync_id) { dout(10) << "sync_id=" << sync_id << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); auto it = m_inflight_sync_reqs.find(sync_id); ceph_assert(it != m_inflight_sync_reqs.end()); @@ -551,7 +552,7 @@ void InstanceWatcher::handle_notify_sync_request(C_SyncRequest *sync_ctx, Context *on_start = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(sync_ctx->req != nullptr); ceph_assert(sync_ctx->on_start != nullptr); @@ -584,7 +585,7 @@ template void InstanceWatcher::print_sync_status(Formatter *f, stringstream *ss) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_image_sync_throttler != nullptr) { m_image_sync_throttler->print_status(f, ss); } @@ -594,7 +595,7 @@ template void InstanceWatcher::handle_acquire_leader() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_sync_throttler == nullptr); m_image_sync_throttler = ImageSyncThrottler::create(m_cct); @@ -607,7 +608,7 @@ template void InstanceWatcher::handle_release_leader() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_sync_throttler != nullptr); @@ -623,7 +624,7 @@ void InstanceWatcher::handle_update_leader( const std::string &leader_instance_id) { dout(10) << "leader_instance_id=" << leader_instance_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_leader_instance_id = leader_instance_id; @@ -637,7 +638,7 @@ void InstanceWatcher::cancel_notify_requests( const std::string &instance_id) { dout(10) << "instance_id=" << instance_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto op : m_notify_ops) { if (op.first == instance_id && !op.second->send_to_leader) { @@ -648,7 +649,7 @@ void InstanceWatcher::cancel_notify_requests( template void InstanceWatcher::register_instance() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); dout(10) << dendl; @@ -668,7 +669,7 @@ void InstanceWatcher::handle_register_instance(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r == 0) { create_instance_object(); @@ -687,7 +688,7 @@ template void InstanceWatcher::create_instance_object() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); librados::ObjectWriteOperation op; op.create(true); @@ -704,7 +705,7 @@ template void InstanceWatcher::handle_create_instance_object(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { derr << "error creating " << m_oid << " object: " << cpp_strerror(r) @@ -722,7 +723,7 @@ template void InstanceWatcher::register_watch() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -735,7 +736,7 @@ template void InstanceWatcher::handle_register_watch(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { derr << "error registering instance watcher for " << m_oid << " object: " @@ -753,7 +754,7 @@ template void InstanceWatcher::acquire_lock() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -768,7 +769,7 @@ void InstanceWatcher::handle_acquire_lock(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { @@ -789,7 +790,7 @@ template void InstanceWatcher::release_lock() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -802,7 +803,7 @@ template void InstanceWatcher::handle_release_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { derr << "error releasing instance lock: " << cpp_strerror(r) << dendl; @@ -815,7 +816,7 @@ template void InstanceWatcher::unregister_watch() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -833,13 +834,13 @@ void InstanceWatcher::handle_unregister_watch(int r) { << cpp_strerror(r) << dendl; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; remove_instance_object(); } template void InstanceWatcher::remove_instance_object() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); dout(10) << dendl; @@ -867,7 +868,7 @@ void InstanceWatcher::handle_remove_instance_object(int r) { << dendl; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; unregister_instance(); } @@ -875,7 +876,7 @@ template void InstanceWatcher::unregister_instance() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); librados::ObjectWriteOperation op; librbd::cls_client::mirror_instances_remove(&op, m_instance_id); @@ -895,7 +896,7 @@ void InstanceWatcher::handle_unregister_instance(int r) { derr << "error unregistering instance: " << cpp_strerror(r) << dendl; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; wait_for_notify_ops(); } @@ -903,7 +904,7 @@ template void InstanceWatcher::wait_for_notify_ops() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); for (auto op : m_notify_ops) { op.second->cancel(); @@ -924,7 +925,7 @@ void InstanceWatcher::handle_wait_for_notify_ops(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_notify_ops.empty()); @@ -938,7 +939,7 @@ template void InstanceWatcher::get_instance_locker() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -951,7 +952,7 @@ template void InstanceWatcher::handle_get_instance_locker(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { if (r != -ENOENT) { @@ -968,7 +969,7 @@ template void InstanceWatcher::break_instance_lock() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -981,7 +982,7 @@ template void InstanceWatcher::handle_break_instance_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { if (r != -ENOENT) { @@ -998,7 +999,7 @@ template void InstanceWatcher::suspend_notify_request(C_NotifyInstanceRequest *req) { dout(10) << req << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); auto result = m_suspended_ops.insert(req).second; ceph_assert(result); @@ -1009,7 +1010,7 @@ bool InstanceWatcher::unsuspend_notify_request( C_NotifyInstanceRequest *req) { dout(10) << req << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); auto result = m_suspended_ops.erase(req); if (result == 0) { @@ -1024,7 +1025,7 @@ template void InstanceWatcher::unsuspend_notify_requests() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); std::set suspended_ops; std::swap(m_suspended_ops, suspended_ops); @@ -1041,7 +1042,7 @@ Context *InstanceWatcher::prepare_request(const std::string &instance_id, dout(10) << "instance_id=" << instance_id << ", request_id=" << request_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; Context *ctx = nullptr; Request request(instance_id, request_id); @@ -1072,7 +1073,7 @@ void InstanceWatcher::complete_request(const std::string &instance_id, C_NotifyAck *on_notify_ack; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; Request request(instance_id, request_id); auto it = m_requests.find(request); ceph_assert(it != m_requests.end()); @@ -1160,7 +1161,7 @@ void InstanceWatcher::handle_sync_request(const std::string &instance_id, Context *on_finish) { dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_image_sync_throttler == nullptr) { dout(10) << "sync request for non-leader" << dendl; @@ -1190,7 +1191,7 @@ void InstanceWatcher::handle_sync_start(const std::string &instance_id, Context *on_finish) { dout(10) << "instance_id=" << instance_id << ", sync_id=" << sync_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_inflight_sync_reqs.find(sync_id); if (it == m_inflight_sync_reqs.end()) { diff --git a/src/tools/rbd_mirror/InstanceWatcher.h b/src/tools/rbd_mirror/InstanceWatcher.h index 5ec1aef0f3c..ec3b56affeb 100644 --- a/src/tools/rbd_mirror/InstanceWatcher.h +++ b/src/tools/rbd_mirror/InstanceWatcher.h @@ -159,7 +159,7 @@ private: InstanceReplayer *m_instance_replayer; std::string m_instance_id; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; librbd::ManagedLock *m_instance_lock; Context *m_on_finish = nullptr; int m_ret_val = 0; @@ -210,7 +210,7 @@ private: bool unsuspend_notify_request(C_NotifyInstanceRequest *req); void unsuspend_notify_requests(); - void notify_sync_complete(const Mutex& lock, const std::string &sync_id); + void notify_sync_complete(const ceph::mutex& lock, const std::string &sync_id); void handle_notify_sync_request(C_SyncRequest *sync_ctx, int r); void handle_notify_sync_complete(C_SyncRequest *sync_ctx, int r); diff --git a/src/tools/rbd_mirror/Instances.cc b/src/tools/rbd_mirror/Instances.cc index b7a6cf11431..13d678d7cea 100644 --- a/src/tools/rbd_mirror/Instances.cc +++ b/src/tools/rbd_mirror/Instances.cc @@ -30,7 +30,7 @@ Instances::Instances(Threads *threads, librados::IoCtx &ioctx, instances::Listener& listener) : m_threads(threads), m_ioctx(ioctx), m_instance_id(instance_id), m_listener(listener), m_cct(reinterpret_cast(ioctx.cct())), - m_lock("rbd::mirror::Instances " + ioctx.get_pool_name()) { + m_lock(ceph::make_mutex("rbd::mirror::Instances " + ioctx.get_pool_name())) { } template @@ -41,7 +41,7 @@ template void Instances::init(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; get_instances(); @@ -51,14 +51,13 @@ template void Instances::shut_down(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; Context *ctx = new FunctionContext( [this](int r) { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; cancel_remove_task(); wait_for_ops(); }); @@ -70,7 +69,7 @@ template void Instances::unblock_listener() { dout(5) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_listener_blocked); m_listener_blocked = false; @@ -91,7 +90,7 @@ template void Instances::acked(const InstanceIds& instance_ids) { dout(10) << "instance_ids=" << instance_ids << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_on_finish != nullptr) { dout(5) << "received on shut down, ignoring" << dendl; return; @@ -105,15 +104,14 @@ template void Instances::handle_acked(const InstanceIds& instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (m_on_finish != nullptr) { dout(5) << "handled on shut down, ignoring" << dendl; return; } InstanceIds added_instance_ids; - auto time = ceph_clock_now(); + auto time = clock_t::now(); for (auto& instance_id : instance_ids) { auto &instance = m_instances.insert( std::make_pair(instance_id, Instance{})).first->second; @@ -132,7 +130,7 @@ void Instances::handle_acked(const InstanceIds& instance_ids) { template void Instances::notify_instances_added(const InstanceIds& instance_ids) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; InstanceIds added_instance_ids; for (auto& instance_id : instance_ids) { auto it = m_instances.find(instance_id); @@ -146,9 +144,9 @@ void Instances::notify_instances_added(const InstanceIds& instance_ids) { } dout(5) << "instance_ids=" << added_instance_ids << dendl; - m_lock.Unlock(); + m_lock.unlock(); m_listener.handle_added(added_instance_ids); - m_lock.Lock(); + m_lock.lock(); for (auto& instance_id : added_instance_ids) { auto it = m_instances.find(instance_id); @@ -163,7 +161,7 @@ void Instances::notify_instances_removed(const InstanceIds& instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; m_listener.handle_removed(instance_ids); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto& instance_id : instance_ids) { m_instances.erase(instance_id); } @@ -173,7 +171,7 @@ template void Instances::list(std::vector *instance_ids) { dout(20) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto it : m_instances) { instance_ids->push_back(it.first); @@ -185,7 +183,7 @@ template void Instances::get_instances() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_context_callback< Instances, &Instances::handle_get_instances>(this); @@ -199,7 +197,7 @@ void Instances::handle_get_instances(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_finish, m_on_finish); } @@ -215,7 +213,7 @@ template void Instances::wait_for_ops() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_threads->work_queue, create_context_callback< @@ -232,15 +230,15 @@ void Instances::handle_wait_for_ops(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_finish, m_on_finish); } on_finish->complete(r); } template -void Instances::remove_instances(const utime_t& time) { - ceph_assert(m_lock.is_locked()); +void Instances::remove_instances(const Instances::clock_t::time_point& time) { + ceph_assert(ceph_mutex_is_locked(m_lock)); InstanceIds instance_ids; for (auto& instance_pair : m_instances) { @@ -275,8 +273,7 @@ void Instances::remove_instances(const utime_t& time) { template void Instances::handle_remove_instances( int r, const InstanceIds& instance_ids) { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; dout(10) << "r=" << r << ", instance_ids=" << instance_ids << dendl; ceph_assert(r == 0); @@ -286,14 +283,14 @@ void Instances::handle_remove_instances( new C_NotifyInstancesRemoved(this, instance_ids), 0); // reschedule the timer for the next batch - schedule_remove_task(ceph_clock_now()); + schedule_remove_task(clock_t::now()); m_async_op_tracker.finish_op(); } template void Instances::cancel_remove_task() { - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_timer_task == nullptr) { return; @@ -307,7 +304,7 @@ void Instances::cancel_remove_task() { } template -void Instances::schedule_remove_task(const utime_t& time) { +void Instances::schedule_remove_task(const Instances::clock_t::time_point& time) { cancel_remove_task(); if (m_on_finish != nullptr) { dout(10) << "received on shut down, ignoring" << dendl; @@ -319,7 +316,7 @@ void Instances::schedule_remove_task(const utime_t& time) { m_cct->_conf.get_val("rbd_mirror_leader_max_acquire_attempts_before_break")); bool schedule = false; - utime_t oldest_time = time; + auto oldest_time = time; for (auto& instance : m_instances) { if (instance.first == m_instance_id) { continue; @@ -342,14 +339,14 @@ void Instances::schedule_remove_task(const utime_t& time) { // schedule a time to fire when the oldest instance should be removed m_timer_task = new FunctionContext( [this, oldest_time](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); - Mutex::Locker locker(m_lock); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + std::lock_guard locker{m_lock}; m_timer_task = nullptr; remove_instances(oldest_time); }); - oldest_time += after; + oldest_time += ceph::make_timespan(after); m_threads->timer->add_event_at(oldest_time, m_timer_task); } diff --git a/src/tools/rbd_mirror/Instances.h b/src/tools/rbd_mirror/Instances.h index dbfb16df2c0..e6e104b7320 100644 --- a/src/tools/rbd_mirror/Instances.h +++ b/src/tools/rbd_mirror/Instances.h @@ -10,7 +10,7 @@ #include "include/buffer_fwd.h" #include "include/rados/librados_fwd.hpp" #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "librbd/Watcher.h" #include "tools/rbd_mirror/instances/Types.h" @@ -74,8 +74,9 @@ private: INSTANCE_STATE_REMOVING }; + using clock_t = ceph::real_clock; struct Instance { - utime_t acked_time{}; + clock_t::time_point acked_time{}; InstanceState state = INSTANCE_STATE_ADDING; }; @@ -134,7 +135,7 @@ private: instances::Listener& m_listener; CephContext *m_cct; - Mutex m_lock; + ceph::mutex m_lock; InstanceIds m_instance_ids; std::map m_instances; Context *m_on_finish = nullptr; @@ -154,11 +155,11 @@ private: void wait_for_ops(); void handle_wait_for_ops(int r); - void remove_instances(const utime_t& time); + void remove_instances(const clock_t::time_point& time); void handle_remove_instances(int r, const InstanceIds& instance_ids); void cancel_remove_task(); - void schedule_remove_task(const utime_t& time); + void schedule_remove_task(const clock_t::time_point& time); }; } // namespace mirror diff --git a/src/tools/rbd_mirror/LeaderWatcher.cc b/src/tools/rbd_mirror/LeaderWatcher.cc index f3291bc6096..801f296ad2e 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.cc +++ b/src/tools/rbd_mirror/LeaderWatcher.cc @@ -2,6 +2,7 @@ // vim: ts=8 sw=2 smarttab #include "LeaderWatcher.h" +#include "common/Cond.h" #include "common/Timer.h" #include "common/debug.h" #include "common/errno.h" @@ -30,7 +31,8 @@ LeaderWatcher::LeaderWatcher(Threads *threads, librados::IoCtx &io_ctx, leader_watcher::Listener *listener) : Watcher(io_ctx, threads->work_queue, RBD_MIRROR_LEADER), m_threads(threads), m_listener(listener), m_instances_listener(this), - m_lock("rbd::mirror::LeaderWatcher " + io_ctx.get_pool_name()), + m_lock(ceph::make_mutex("rbd::mirror::LeaderWatcher " + + io_ctx.get_pool_name())), m_notifier_id(librados::Rados(io_ctx).get_instance_id()), m_instance_id(stringify(m_notifier_id)), m_leader_lock(new LeaderLock(m_ioctx, m_work_queue, m_oid, this, true, @@ -63,7 +65,7 @@ template void LeaderWatcher::init(Context *on_finish) { dout(10) << "notifier_id=" << m_notifier_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -75,7 +77,7 @@ template void LeaderWatcher::create_leader_object() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); librados::ObjectWriteOperation op; op.create(false); @@ -93,7 +95,7 @@ void LeaderWatcher::handle_create_leader_object(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r == 0) { register_watch(); @@ -112,7 +114,7 @@ template void LeaderWatcher::register_watch() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -127,13 +129,13 @@ void LeaderWatcher::handle_register_watch(int r) { Context *on_finish = nullptr; if (r < 0) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; derr << "error registering leader watcher for " << m_oid << " object: " << cpp_strerror(r) << dendl; ceph_assert(m_on_finish != nullptr); std::swap(on_finish, m_on_finish); } else { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; init_status_watcher(); return; } @@ -153,8 +155,7 @@ template void LeaderWatcher::shut_down(Context *on_finish) { dout(10) << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(m_on_shut_down_finish == nullptr); m_on_shut_down_finish = on_finish; @@ -166,7 +167,7 @@ template void LeaderWatcher::shut_down_leader_lock() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -179,7 +180,7 @@ template void LeaderWatcher::handle_shut_down_leader_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { derr << "error shutting down leader lock: " << cpp_strerror(r) << dendl; @@ -192,7 +193,7 @@ template void LeaderWatcher::unregister_watch() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -216,8 +217,7 @@ template void LeaderWatcher::wait_for_tasks() { dout(10) << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; schedule_timer_task("wait for tasks", 0, false, &LeaderWatcher::handle_wait_for_tasks, true); } @@ -226,8 +226,8 @@ template void LeaderWatcher::handle_wait_for_tasks() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_on_shut_down_finish != nullptr); ceph_assert(!m_timer_op_tracker.empty()); @@ -237,7 +237,7 @@ void LeaderWatcher::handle_wait_for_tasks() { Context *on_finish; { // ensure lock isn't held when completing shut down - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_shut_down_finish != nullptr); on_finish = m_on_shut_down_finish; } @@ -248,14 +248,13 @@ void LeaderWatcher::handle_wait_for_tasks() { template bool LeaderWatcher::is_leader() const { - Mutex::Locker locker(m_lock); - + std::lock_guard locker{m_lock}; return is_leader(m_lock); } template -bool LeaderWatcher::is_leader(Mutex &lock) const { - ceph_assert(m_lock.is_locked()); +bool LeaderWatcher::is_leader(ceph::mutex &lock) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); bool leader = m_leader_lock->is_leader(); dout(10) << leader << dendl; @@ -264,14 +263,13 @@ bool LeaderWatcher::is_leader(Mutex &lock) const { template bool LeaderWatcher::is_releasing_leader() const { - Mutex::Locker locker(m_lock); - + std::lock_guard locker{m_lock}; return is_releasing_leader(m_lock); } template -bool LeaderWatcher::is_releasing_leader(Mutex &lock) const { - ceph_assert(m_lock.is_locked()); +bool LeaderWatcher::is_releasing_leader(ceph::mutex &lock) const { + ceph_assert(ceph_mutex_is_locked(m_lock)); bool releasing = m_leader_lock->is_releasing_leader(); dout(10) << releasing << dendl; @@ -282,7 +280,7 @@ template bool LeaderWatcher::get_leader_instance_id(std::string *instance_id) const { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (is_leader(m_lock) || is_releasing_leader(m_lock)) { *instance_id = m_instance_id; @@ -301,7 +299,7 @@ template void LeaderWatcher::release_leader() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!is_leader(m_lock)) { return; } @@ -313,7 +311,7 @@ template void LeaderWatcher::list_instances(std::vector *instance_ids) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; instance_ids->clear(); if (m_instances != nullptr) { @@ -323,8 +321,8 @@ void LeaderWatcher::list_instances(std::vector *instance_ids) { template void LeaderWatcher::cancel_timer_task() { - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (m_timer_task == nullptr) { return; @@ -341,8 +339,8 @@ void LeaderWatcher::schedule_timer_task(const std::string &name, int delay_factor, bool leader, TimerCallback timer_callback, bool shutting_down) { - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (!shutting_down && m_on_shut_down_finish != nullptr) { return; @@ -352,11 +350,11 @@ void LeaderWatcher::schedule_timer_task(const std::string &name, m_timer_task = new FunctionContext( [this, leader, timer_callback](int r) { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); m_timer_task = nullptr; if (m_timer_op_tracker.empty()) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; execute_timer_task(leader, timer_callback); return; } @@ -384,8 +382,8 @@ void LeaderWatcher::execute_timer_task(bool leader, TimerCallback timer_callback) { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_timer_op_tracker.empty()); if (is_leader(m_lock) != leader) { @@ -411,7 +409,7 @@ void LeaderWatcher::handle_post_acquire_leader_lock(int r, return; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; m_ret_val = 0; @@ -423,7 +421,7 @@ template void LeaderWatcher::handle_pre_release_leader_lock(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; m_ret_val = 0; @@ -441,7 +439,7 @@ void LeaderWatcher::handle_post_release_leader_lock(int r, return; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == nullptr); m_on_finish = on_finish; @@ -452,8 +450,8 @@ template void LeaderWatcher::break_leader_lock() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); if (m_locker.cookie.empty()) { @@ -472,8 +470,7 @@ template void LeaderWatcher::handle_break_leader_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { @@ -499,8 +496,8 @@ void LeaderWatcher::schedule_get_locker(bool reset_leader, uint32_t delay_factor) { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); if (reset_leader) { m_locker = {}; @@ -515,8 +512,8 @@ template void LeaderWatcher::get_locker() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); C_GetLocker *get_locker_ctx = new C_GetLocker(this); @@ -530,8 +527,7 @@ void LeaderWatcher::handle_get_locker(int r, librbd::managed_lock::Locker& locker) { dout(10) << "r=" << r << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker mutex_locker(m_lock); + std::scoped_lock l{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { @@ -590,8 +586,7 @@ void LeaderWatcher::handle_get_locker(int r, if (get_leader_instance_id(&instance_id)) { m_listener->update_leader_handler(instance_id); } - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; m_timer_op_tracker.finish_op(); }); m_work_queue->queue(ctx, 0); @@ -601,8 +596,8 @@ template void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); schedule_timer_task("acquire leader lock", delay_factor * @@ -612,8 +607,8 @@ void LeaderWatcher::schedule_acquire_leader_lock(uint32_t delay_factor) { template void LeaderWatcher::acquire_leader_lock() { - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); ++m_acquire_attempts; @@ -629,8 +624,7 @@ template void LeaderWatcher::handle_acquire_leader_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); if (m_leader_lock->is_shutdown()) { @@ -667,7 +661,7 @@ template void LeaderWatcher::release_leader_lock() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -680,8 +674,7 @@ template void LeaderWatcher::handle_release_leader_lock(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (r < 0) { derr << "error releasing lock: " << cpp_strerror(r) << dendl; @@ -695,7 +688,7 @@ template void LeaderWatcher::init_status_watcher() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_status_watcher == nullptr); m_status_watcher = MirrorStatusWatcher::create(m_ioctx, m_work_queue); @@ -712,8 +705,7 @@ void LeaderWatcher::handle_init_status_watcher(int r) { Context *on_finish = nullptr; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (r < 0) { derr << "error initializing mirror status watcher: " << cpp_strerror(r) @@ -733,7 +725,7 @@ template void LeaderWatcher::shut_down_status_watcher() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_status_watcher != nullptr); Context *ctx = create_async_context_callback( @@ -747,7 +739,7 @@ template void LeaderWatcher::handle_shut_down_status_watcher(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_status_watcher->destroy(); m_status_watcher = nullptr; @@ -763,7 +755,7 @@ template void LeaderWatcher::init_instances() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_instances == nullptr); m_instances = Instances::create(m_threads, m_ioctx, m_instance_id, @@ -781,7 +773,7 @@ void LeaderWatcher::handle_init_instances(int r) { Context *on_finish = nullptr; if (r < 0) { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; derr << "error initializing instances: " << cpp_strerror(r) << dendl; m_instances->destroy(); m_instances = nullptr; @@ -789,7 +781,7 @@ void LeaderWatcher::handle_init_instances(int r) { ceph_assert(m_on_finish != nullptr); std::swap(m_on_finish, on_finish); } else { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; notify_listener(); return; } @@ -801,7 +793,7 @@ template void LeaderWatcher::shut_down_instances() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(m_instances != nullptr); Context *ctx = create_async_context_callback( @@ -818,7 +810,7 @@ void LeaderWatcher::handle_shut_down_instances(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_instances->destroy(); m_instances = nullptr; @@ -833,7 +825,7 @@ template void LeaderWatcher::notify_listener() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_async_context_callback( m_work_queue, create_context_callback< @@ -857,7 +849,7 @@ template void LeaderWatcher::handle_notify_listener(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0) { derr << "error notifying listener: " << cpp_strerror(r) << dendl; @@ -875,7 +867,7 @@ template void LeaderWatcher::notify_lock_acquired() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_lock_acquired>(this); @@ -892,7 +884,7 @@ void LeaderWatcher::handle_notify_lock_acquired(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying leader lock acquired: " << cpp_strerror(r) << dendl; @@ -914,7 +906,7 @@ template void LeaderWatcher::notify_lock_released() { dout(10) << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); Context *ctx = create_context_callback< LeaderWatcher, &LeaderWatcher::handle_notify_lock_released>(this); @@ -931,7 +923,7 @@ void LeaderWatcher::handle_notify_lock_released(int r) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (r < 0 && r != -ETIMEDOUT) { derr << "error notifying leader lock released: " << cpp_strerror(r) << dendl; @@ -947,8 +939,8 @@ template void LeaderWatcher::notify_heartbeat() { dout(10) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); + ceph_assert(ceph_mutex_is_locked(m_lock)); ceph_assert(!m_timer_op_tracker.empty()); if (!is_leader(m_lock)) { @@ -971,8 +963,7 @@ template void LeaderWatcher::handle_notify_heartbeat(int r) { dout(10) << "r=" << r << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_timer_op_tracker.empty()); m_timer_op_tracker.finish_op(); @@ -1011,8 +1002,7 @@ void LeaderWatcher::handle_heartbeat(Context *on_notify_ack) { dout(10) << dendl; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader heartbeat, ignoring" << dendl; } else { @@ -1030,8 +1020,7 @@ void LeaderWatcher::handle_lock_acquired(Context *on_notify_ack) { dout(10) << dendl; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader lock_acquired, ignoring" << dendl; } else { @@ -1048,8 +1037,7 @@ void LeaderWatcher::handle_lock_released(Context *on_notify_ack) { dout(10) << dendl; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (is_leader(m_lock)) { dout(5) << "got another leader lock_released, ignoring" << dendl; } else { diff --git a/src/tools/rbd_mirror/LeaderWatcher.h b/src/tools/rbd_mirror/LeaderWatcher.h index 54ee5dc82c3..9d8e85f9e8a 100644 --- a/src/tools/rbd_mirror/LeaderWatcher.h +++ b/src/tools/rbd_mirror/LeaderWatcher.h @@ -129,12 +129,12 @@ private: } bool is_leader() const { - Mutex::Locker locker(Parent::m_lock); + std::lock_guard locker{Parent::m_lock}; return Parent::is_state_post_acquiring() || Parent::is_state_locked(); } bool is_releasing_leader() const { - Mutex::Locker locker(Parent::m_lock); + std::lock_guard locker{Parent::m_lock}; return Parent::is_state_pre_releasing(); } @@ -142,7 +142,7 @@ private: void post_acquire_lock_handler(int r, Context *on_finish) { if (r == 0) { // lock is owned at this point - Mutex::Locker locker(Parent::m_lock); + std::lock_guard locker{Parent::m_lock}; Parent::set_state_post_acquiring(); } watcher->handle_post_acquire_leader_lock(r, on_finish); @@ -208,7 +208,7 @@ private: leader_watcher::Listener *m_listener; InstancesListener m_instances_listener; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; uint64_t m_notifier_id; std::string m_instance_id; LeaderLock *m_leader_lock; @@ -226,8 +226,8 @@ private: librbd::watcher::NotifyResponse m_heartbeat_response; - bool is_leader(Mutex &m_lock) const; - bool is_releasing_leader(Mutex &m_lock) const; + bool is_leader(ceph::mutex &m_lock) const; + bool is_releasing_leader(ceph::mutex &m_lock) const; void cancel_timer_task(); void schedule_timer_task(const std::string &name, diff --git a/src/tools/rbd_mirror/Mirror.cc b/src/tools/rbd_mirror/Mirror.cc index 9fc6d0b3a32..b36c1a9cc28 100644 --- a/src/tools/rbd_mirror/Mirror.cc +++ b/src/tools/rbd_mirror/Mirror.cc @@ -343,7 +343,7 @@ private: class CacheManagerHandler : public journal::CacheManagerHandler { public: CacheManagerHandler(CephContext *cct) - : m_cct(cct), m_lock("rbd::mirror::CacheManagerHandler") { + : m_cct(cct) { if (!m_cct->_conf.get_val("rbd_mirror_memory_autotune")) { return; @@ -375,7 +375,7 @@ public: } ~CacheManagerHandler() { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_caches.empty()); } @@ -391,7 +391,7 @@ public: dout(20) << cache_name << " min_size=" << min_size << " max_size=" << max_size << " handler=" << handler << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto p = m_caches.insert( {cache_name, {cache_name, min_size, max_size, handler}}); @@ -408,7 +408,7 @@ public: dout(20) << cache_name << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto it = m_caches.find(cache_name); ceph_assert(it != m_caches.end()); @@ -423,7 +423,7 @@ public: return; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; // Before we trim, check and see if it's time to rebalance/resize. auto autotune_interval = m_cct->_conf.get_val( @@ -472,7 +472,8 @@ private: CephContext *m_cct; - mutable Mutex m_lock; + mutable ceph::mutex m_lock = + ceph::make_mutex("rbd::mirror::CacheManagerHandler"); std::unique_ptr m_cache_manager; std::map m_caches; @@ -483,7 +484,6 @@ private: Mirror::Mirror(CephContext *cct, const std::vector &args) : m_cct(cct), m_args(args), - m_lock("rbd::mirror::Mirror"), m_local(new librados::Rados()), m_cache_manager_handler(new CacheManagerHandler(cct)), m_asok_hook(new MirrorAdminSocketHook(cct, this)) @@ -503,8 +503,8 @@ void Mirror::handle_signal(int signum) { m_stopping = true; { - Mutex::Locker l(m_lock); - m_cond.Signal(); + std::lock_guard l{m_lock}; + m_cond.notify_all(); } } @@ -548,18 +548,18 @@ void Mirror::run() next_refresh_pools += m_cct->_conf.get_val( "rbd_mirror_pool_replayers_refresh_interval"); } - Mutex::Locker l(m_lock); + std::unique_lock l{m_lock}; if (!m_manual_stop) { if (refresh_pools) { update_pool_replayers(m_local_cluster_watcher->get_pool_peers()); } m_cache_manager_handler->run_cache_manager(); } - m_cond.WaitInterval(m_lock, {1, 0}); + m_cond.wait_for(l, 1s); } // stop all pool replayers in parallel - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; for (auto &pool_replayer : m_pool_replayers) { pool_replayer.second->stop(false); } @@ -570,7 +570,7 @@ void Mirror::print_status(Formatter *f, stringstream *ss) { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -593,7 +593,7 @@ void Mirror::print_status(Formatter *f, stringstream *ss) void Mirror::start() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -609,7 +609,7 @@ void Mirror::start() void Mirror::stop() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -625,7 +625,7 @@ void Mirror::stop() void Mirror::restart() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -641,7 +641,7 @@ void Mirror::restart() void Mirror::flush() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping || m_manual_stop) { return; @@ -655,7 +655,7 @@ void Mirror::flush() void Mirror::release_leader() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -669,7 +669,7 @@ void Mirror::release_leader() void Mirror::update_pool_replayers(const PoolPeers &pool_peers) { dout(20) << "enter" << dendl; - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); // remove stale pool replayers before creating new pool replayers for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { diff --git a/src/tools/rbd_mirror/Mirror.h b/src/tools/rbd_mirror/Mirror.h index 12076be2c77..6af03af9f6d 100644 --- a/src/tools/rbd_mirror/Mirror.h +++ b/src/tools/rbd_mirror/Mirror.h @@ -5,7 +5,7 @@ #define CEPH_RBD_MIRROR_H #include "common/ceph_context.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "include/rados/librados.hpp" #include "include/utime.h" #include "ClusterWatcher.h" @@ -65,8 +65,8 @@ private: CephContext *m_cct; std::vector m_args; Threads *m_threads = nullptr; - Mutex m_lock; - Cond m_cond; + ceph::mutex m_lock = ceph::make_mutex("rbd::mirror::Mirror"); + ceph::condition_variable m_cond; RadosRef m_local; std::unique_ptr> m_service_daemon; diff --git a/src/tools/rbd_mirror/PoolReplayer.cc b/src/tools/rbd_mirror/PoolReplayer.cc index c41dffaa93c..4cc29ea0927 100644 --- a/src/tools/rbd_mirror/PoolReplayer.cc +++ b/src/tools/rbd_mirror/PoolReplayer.cc @@ -237,7 +237,7 @@ PoolReplayer::PoolReplayer( m_local_pool_id(local_pool_id), m_peer(peer), m_args(args), - m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)), + m_lock(ceph::make_mutex(stringify("rbd::mirror::PoolReplayer ") + stringify(peer))), m_local_pool_watcher_listener(this, true), m_remote_pool_watcher_listener(this, false), m_image_map_listener(this), @@ -255,13 +255,13 @@ PoolReplayer::~PoolReplayer() template bool PoolReplayer::is_blacklisted() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_blacklisted; } template bool PoolReplayer::is_leader() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_leader_watcher && m_leader_watcher->is_leader(); } @@ -379,8 +379,8 @@ template void PoolReplayer::shut_down() { m_stopping = true; { - Mutex::Locker l(m_lock); - m_cond.Signal(); + std::lock_guard l{m_lock}; + m_cond.notify_all(); } if (m_pool_replayer_thread.is_started()) { m_pool_replayer_thread.join(); @@ -550,7 +550,7 @@ void PoolReplayer::run() m_asok_hook_name, this); } - Mutex::Locker locker(m_lock); + std::unique_lock locker{m_lock}; if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) || (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) { m_blacklisted = true; @@ -559,7 +559,7 @@ void PoolReplayer::run() } if (!m_stopping) { - m_cond.WaitInterval(m_lock, utime_t(1, 0)); + m_cond.wait_for(locker, 1s); } } @@ -575,7 +575,7 @@ void PoolReplayer::print_status(Formatter *f, stringstream *ss) return; } - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; f->open_object_section("pool_replayer_status"); f->dump_string("pool", m_local_io_ctx.get_pool_name()); @@ -634,7 +634,7 @@ void PoolReplayer::start() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -649,10 +649,10 @@ void PoolReplayer::stop(bool manual) { dout(20) << "enter: manual=" << manual << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (!manual) { m_stopping = true; - m_cond.Signal(); + m_cond.notify_all(); return; } else if (m_stopping) { return; @@ -667,7 +667,7 @@ void PoolReplayer::restart() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping) { return; @@ -681,7 +681,7 @@ void PoolReplayer::flush() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping || m_manual_stop) { return; @@ -695,7 +695,7 @@ void PoolReplayer::release_leader() { dout(20) << "enter" << dendl; - Mutex::Locker l(m_lock); + std::lock_guard l{m_lock}; if (m_stopping || !m_leader_watcher) { return; @@ -715,7 +715,7 @@ void PoolReplayer::handle_update(const std::string &mirror_uuid, dout(10) << "mirror_uuid=" << mirror_uuid << ", " << "added_count=" << added_image_ids.size() << ", " << "removed_count=" << removed_image_ids.size() << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } @@ -768,7 +768,7 @@ template void PoolReplayer::init_image_map(Context *on_finish) { dout(5) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_image_map); m_image_map.reset(ImageMap::create(m_local_io_ctx, m_threads, m_instance_watcher->get_instance_id(), @@ -800,7 +800,7 @@ template void PoolReplayer::init_local_pool_watcher(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_local_pool_watcher); m_local_pool_watcher.reset(PoolWatcher::create( m_threads, m_local_io_ctx, m_local_pool_watcher_listener)); @@ -834,7 +834,7 @@ template void PoolReplayer::init_remote_pool_watcher(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_remote_pool_watcher); m_remote_pool_watcher.reset(PoolWatcher::create( m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener)); @@ -872,7 +872,7 @@ template void PoolReplayer::init_image_deleter(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_image_deleter); on_finish = new FunctionContext([this, on_finish](int r) { @@ -898,15 +898,15 @@ void PoolReplayer::handle_init_image_deleter(int r, Context *on_finish) { on_finish->complete(0); - Mutex::Locker locker(m_lock); - m_cond.Signal(); + std::lock_guard locker{m_lock}; + m_cond.notify_all(); } template void PoolReplayer::shut_down_image_deleter(Context* on_finish) { dout(10) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_image_deleter) { Context *ctx = new FunctionContext([this, on_finish](int r) { handle_shut_down_image_deleter(r, on_finish); @@ -926,7 +926,7 @@ void PoolReplayer::handle_shut_down_image_deleter( dout(10) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_deleter); m_image_deleter.reset(); } @@ -939,7 +939,7 @@ void PoolReplayer::shut_down_pool_watchers(Context *on_finish) { dout(10) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_local_pool_watcher) { Context *ctx = new FunctionContext([this, on_finish](int r) { handle_shut_down_pool_watchers(r, on_finish); @@ -965,7 +965,7 @@ void PoolReplayer::handle_shut_down_pool_watchers( dout(10) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_local_pool_watcher); m_local_pool_watcher.reset(); @@ -980,7 +980,7 @@ template void PoolReplayer::wait_for_update_ops(Context *on_finish) { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; Context *ctx = new FunctionContext([this, on_finish](int r) { handle_wait_for_update_ops(r, on_finish); @@ -1003,7 +1003,7 @@ void PoolReplayer::shut_down_image_map(Context *on_finish) { dout(5) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_image_map) { on_finish = new FunctionContext([this, on_finish](int r) { handle_shut_down_image_map(r, on_finish); @@ -1024,7 +1024,7 @@ void PoolReplayer::handle_shut_down_image_map(int r, Context *on_finish) { derr << "failed to shut down image map: " << cpp_strerror(r) << dendl; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_map); m_image_map.reset(); @@ -1078,7 +1078,7 @@ void PoolReplayer::handle_remove_image(const std::string &mirror_uuid, template void PoolReplayer::handle_instances_added(const InstanceIds &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } @@ -1091,7 +1091,7 @@ template void PoolReplayer::handle_instances_removed( const InstanceIds &instance_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (!m_leader_watcher->is_leader()) { return; } diff --git a/src/tools/rbd_mirror/PoolReplayer.h b/src/tools/rbd_mirror/PoolReplayer.h index 204d88781b3..c734d9a8809 100644 --- a/src/tools/rbd_mirror/PoolReplayer.h +++ b/src/tools/rbd_mirror/PoolReplayer.h @@ -5,8 +5,7 @@ #define CEPH_RBD_MIRROR_POOL_REPLAYER_H #include "common/AsyncOpTracker.h" -#include "common/Cond.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "common/WorkQueue.h" #include "include/rados/librados.hpp" @@ -220,8 +219,8 @@ private: PeerSpec m_peer; std::vector m_args; - mutable Mutex m_lock; - Cond m_cond; + mutable ceph::mutex m_lock; + ceph::condition_variable m_cond; std::atomic m_stopping = { false }; bool m_manual_stop = false; bool m_blacklisted = false; diff --git a/src/tools/rbd_mirror/PoolWatcher.cc b/src/tools/rbd_mirror/PoolWatcher.cc index b5c62b7b2d6..0ee7be40557 100644 --- a/src/tools/rbd_mirror/PoolWatcher.cc +++ b/src/tools/rbd_mirror/PoolWatcher.cc @@ -71,7 +71,7 @@ template PoolWatcher::PoolWatcher(Threads *threads, librados::IoCtx &remote_io_ctx, pool_watcher::Listener &listener) : m_threads(threads), m_remote_io_ctx(remote_io_ctx), m_listener(listener), - m_lock(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this)) { + m_lock(ceph::make_mutex(librbd::util::unique_lock_name("rbd::mirror::PoolWatcher", this))) { m_mirroring_watcher = new MirroringWatcher(m_remote_io_ctx, m_threads->work_queue, this); } @@ -83,7 +83,7 @@ PoolWatcher::~PoolWatcher() { template bool PoolWatcher::is_blacklisted() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_blacklisted; } @@ -92,7 +92,7 @@ void PoolWatcher::init(Context *on_finish) { dout(5) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_on_init_finish = on_finish; ceph_assert(!m_refresh_in_progress); @@ -108,8 +108,7 @@ void PoolWatcher::shut_down(Context *on_finish) { dout(5) << dendl; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_shutting_down); m_shutting_down = true; @@ -128,7 +127,7 @@ void PoolWatcher::shut_down(Context *on_finish) { template void PoolWatcher::register_watcher() { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_ids_invalid); ceph_assert(m_refresh_in_progress); } @@ -154,7 +153,7 @@ void PoolWatcher::handle_register_watcher(int r) { dout(5) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_ids_invalid); ceph_assert(m_refresh_in_progress); if (r < 0) { @@ -168,14 +167,14 @@ void PoolWatcher::handle_register_watcher(int r) { } else if (r == -EBLACKLISTED) { dout(0) << "detected client is blacklisted" << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_blacklisted = true; std::swap(on_init_finish, m_on_init_finish); } else if (r == -ENOENT) { dout(5) << "mirroring directory does not exist" << dendl; schedule_refresh_images(30); - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_init_finish, m_on_init_finish); } else { derr << "unexpected error registering mirroring directory watch: " @@ -212,7 +211,7 @@ void PoolWatcher::refresh_images() { dout(5) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_ids_invalid); ceph_assert(m_refresh_in_progress); @@ -239,7 +238,7 @@ void PoolWatcher::handle_refresh_images(int r) { bool retry_refresh = false; Context *on_init_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_ids_invalid); ceph_assert(m_refresh_in_progress); @@ -300,7 +299,7 @@ void PoolWatcher::handle_get_mirror_uuid(int r) { bool retry_refresh = false; Context *on_init_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_image_ids_invalid); ceph_assert(m_refresh_in_progress); m_refresh_in_progress = false; @@ -354,8 +353,7 @@ void PoolWatcher::handle_get_mirror_uuid(int r) { template void PoolWatcher::schedule_refresh_images(double interval) { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (m_shutting_down || m_refresh_in_progress || m_timer_ctx != nullptr) { if (m_refresh_in_progress && !m_deferred_refresh) { dout(5) << "deferring refresh until in-flight refresh completes" << dendl; @@ -379,7 +377,7 @@ void PoolWatcher::handle_rewatch_complete(int r) { if (r == -EBLACKLISTED) { dout(0) << "detected client is blacklisted" << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_blacklisted = true; return; } else if (r == -ENOENT) { @@ -400,7 +398,7 @@ void PoolWatcher::handle_image_updated(const std::string &remote_image_id, << "global_image_id=" << global_image_id << ", " << "enabled=" << enabled << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ImageId image_id(global_image_id, remote_image_id); m_pending_added_image_ids.erase(image_id); m_pending_removed_image_ids.erase(image_id); @@ -416,12 +414,12 @@ void PoolWatcher::handle_image_updated(const std::string &remote_image_id, template void PoolWatcher::process_refresh_images() { - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(m_timer_ctx != nullptr); m_timer_ctx = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_refresh_in_progress); m_refresh_in_progress = true; m_deferred_refresh = false; @@ -438,7 +436,7 @@ void PoolWatcher::process_refresh_images() { template void PoolWatcher::schedule_listener() { - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); m_pending_updates = true; if (m_shutting_down || m_image_ids_invalid || m_notify_listener_in_progress) { return; @@ -464,7 +462,7 @@ void PoolWatcher::notify_listener() { ImageIds added_image_ids; ImageIds removed_image_ids; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_notify_listener_in_progress); // if the mirror uuid is updated, treat it as the removal of all @@ -488,7 +486,7 @@ void PoolWatcher::notify_listener() { } { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_notify_listener_in_progress); // if the watch failed while we didn't own the lock, we are going @@ -537,7 +535,7 @@ void PoolWatcher::notify_listener() { std::move(removed_image_ids)); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_notify_listener_in_progress = false; if (m_pending_updates) { schedule_listener(); diff --git a/src/tools/rbd_mirror/PoolWatcher.h b/src/tools/rbd_mirror/PoolWatcher.h index 1136a319fca..ec21e343ece 100644 --- a/src/tools/rbd_mirror/PoolWatcher.h +++ b/src/tools/rbd_mirror/PoolWatcher.h @@ -11,7 +11,7 @@ #include "common/AsyncOpTracker.h" #include "common/ceph_context.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "include/rados/librados.hpp" #include "tools/rbd_mirror/Types.h" #include @@ -51,7 +51,7 @@ public: void shut_down(Context *on_finish); inline uint64_t get_image_count() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return m_image_ids.size(); } @@ -109,7 +109,7 @@ private: ImageIds m_refresh_image_ids; bufferlist m_out_bl; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; Context *m_on_init_finish = nullptr; diff --git a/src/tools/rbd_mirror/ServiceDaemon.cc b/src/tools/rbd_mirror/ServiceDaemon.cc index f3b549b860c..b08c6effa90 100644 --- a/src/tools/rbd_mirror/ServiceDaemon.cc +++ b/src/tools/rbd_mirror/ServiceDaemon.cc @@ -52,15 +52,14 @@ using namespace service_daemon; template ServiceDaemon::ServiceDaemon(CephContext *cct, RadosRef rados, Threads* threads) - : m_cct(cct), m_rados(rados), m_threads(threads), - m_lock("rbd::mirror::ServiceDaemon") { + : m_cct(cct), m_rados(rados), m_threads(threads) { dout(20) << dendl; } template ServiceDaemon::~ServiceDaemon() { dout(20) << dendl; - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_timer_ctx != nullptr) { m_threads->timer->cancel_event(m_timer_ctx); update_status(); @@ -93,7 +92,7 @@ void ServiceDaemon::add_pool(int64_t pool_id, const std::string& pool_name) { dout(20) << "pool_id=" << pool_id << ", pool_name=" << pool_name << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_pools.insert({pool_id, {pool_name}}); } schedule_update_status(); @@ -103,7 +102,7 @@ template void ServiceDaemon::remove_pool(int64_t pool_id) { dout(20) << "pool_id=" << pool_id << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_pools.erase(pool_id); } schedule_update_status(); @@ -120,7 +119,7 @@ uint64_t ServiceDaemon::add_or_update_callout(int64_t pool_id, << "text=" << text << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto pool_it = m_pools.find(pool_id); if (pool_it == m_pools.end()) { return CALLOUT_ID_NONE; @@ -142,7 +141,7 @@ void ServiceDaemon::remove_callout(int64_t pool_id, uint64_t callout_id) { << "callout_id=" << callout_id << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto pool_it = m_pools.find(pool_id); if (pool_it == m_pools.end()) { return; @@ -162,7 +161,7 @@ void ServiceDaemon::add_or_update_attribute(int64_t pool_id, << "value=" << value << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto pool_it = m_pools.find(pool_id); if (pool_it == m_pools.end()) { return; @@ -180,7 +179,7 @@ void ServiceDaemon::remove_attribute(int64_t pool_id, << "key=" << key << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; auto pool_it = m_pools.find(pool_id); if (pool_it == m_pools.end()) { return; @@ -193,7 +192,7 @@ void ServiceDaemon::remove_attribute(int64_t pool_id, template void ServiceDaemon::schedule_update_status() { - Mutex::Locker timer_locker(m_threads->timer_lock); + std::lock_guard timer_locker{m_threads->timer_lock}; if (m_timer_ctx != nullptr) { return; } @@ -208,11 +207,11 @@ void ServiceDaemon::schedule_update_status() { template void ServiceDaemon::update_status() { dout(20) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph::JSONFormatter f; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; f.open_object_section("pools"); for (auto& pool_pair : m_pools) { f.open_object_section(stringify(pool_pair.first).c_str()); diff --git a/src/tools/rbd_mirror/ServiceDaemon.h b/src/tools/rbd_mirror/ServiceDaemon.h index 1de7e20bf5e..68599df8099 100644 --- a/src/tools/rbd_mirror/ServiceDaemon.h +++ b/src/tools/rbd_mirror/ServiceDaemon.h @@ -4,7 +4,7 @@ #ifndef CEPH_RBD_MIRROR_SERVICE_DAEMON_H #define CEPH_RBD_MIRROR_SERVICE_DAEMON_H -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "tools/rbd_mirror/Types.h" #include "tools/rbd_mirror/service_daemon/Types.h" #include @@ -68,7 +68,7 @@ private: RadosRef m_rados; Threads* m_threads; - Mutex m_lock; + ceph::mutex m_lock = ceph::make_mutex("rbd::mirror::ServiceDaemon"); Pools m_pools; uint64_t m_callout_id = service_daemon::CALLOUT_ID_NONE; diff --git a/src/tools/rbd_mirror/Threads.cc b/src/tools/rbd_mirror/Threads.cc index ca0a8b0f93a..2ec0cf2a44d 100644 --- a/src/tools/rbd_mirror/Threads.cc +++ b/src/tools/rbd_mirror/Threads.cc @@ -10,7 +10,7 @@ namespace rbd { namespace mirror { template -Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") { +Threads::Threads(CephContext *cct) { thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", cct->_conf.get_val("rbd_op_threads"), "rbd_op_threads"); @@ -27,7 +27,7 @@ Threads::Threads(CephContext *cct) : timer_lock("Threads::timer_lock") { template Threads::~Threads() { { - Mutex::Locker timer_locker(timer_lock); + std::lock_guard timer_locker{timer_lock}; timer->shutdown(); } delete timer; diff --git a/src/tools/rbd_mirror/Threads.h b/src/tools/rbd_mirror/Threads.h index f52e8837d35..d6a55579f63 100644 --- a/src/tools/rbd_mirror/Threads.h +++ b/src/tools/rbd_mirror/Threads.h @@ -4,7 +4,7 @@ #ifndef CEPH_RBD_MIRROR_THREADS_H #define CEPH_RBD_MIRROR_THREADS_H -#include "common/Mutex.h" +#include "common/ceph_mutex.h" class CephContext; class ContextWQ; @@ -22,7 +22,8 @@ struct Threads { ContextWQ *work_queue = nullptr; SafeTimer *timer = nullptr; - Mutex timer_lock; + ceph::mutex timer_lock = + ceph::make_mutex("Threads::timer_lock"); explicit Threads(CephContext *cct); Threads(const Threads&) = delete; diff --git a/src/tools/rbd_mirror/image_deleter/SnapshotPurgeRequest.cc b/src/tools/rbd_mirror/image_deleter/SnapshotPurgeRequest.cc index 4a59047236e..020db85b877 100644 --- a/src/tools/rbd_mirror/image_deleter/SnapshotPurgeRequest.cc +++ b/src/tools/rbd_mirror/image_deleter/SnapshotPurgeRequest.cc @@ -35,7 +35,7 @@ void SnapshotPurgeRequest::open_image() { m_image_ctx = I::create("", m_image_id, nullptr, m_io_ctx, false); { - RWLock::WLocker image_locker(m_image_ctx->image_lock); + std::unique_lock image_locker{m_image_ctx->image_lock}; m_image_ctx->set_journal_policy(new JournalPolicy()); } @@ -66,9 +66,9 @@ template void SnapshotPurgeRequest::acquire_lock() { dout(10) << dendl; - m_image_ctx->owner_lock.get_read(); + m_image_ctx->owner_lock.lock_shared(); if (m_image_ctx->exclusive_lock == nullptr) { - m_image_ctx->owner_lock.put_read(); + m_image_ctx->owner_lock.unlock_shared(); derr << "exclusive lock not enabled" << dendl; m_ret_val = -EINVAL; @@ -79,7 +79,7 @@ void SnapshotPurgeRequest::acquire_lock() { m_image_ctx->exclusive_lock->acquire_lock(create_context_callback< SnapshotPurgeRequest, &SnapshotPurgeRequest::handle_acquire_lock>( this)); - m_image_ctx->owner_lock.put_read(); + m_image_ctx->owner_lock.unlock_shared(); } template @@ -94,7 +94,7 @@ void SnapshotPurgeRequest::handle_acquire_lock(int r) { } { - RWLock::RLocker image_locker(m_image_ctx->image_lock); + std::shared_lock image_locker{m_image_ctx->image_lock}; m_snaps = m_image_ctx->snaps; } snap_unprotect(); @@ -108,10 +108,10 @@ void SnapshotPurgeRequest::snap_unprotect() { } librados::snap_t snap_id = m_snaps.back(); - m_image_ctx->image_lock.get_read(); + m_image_ctx->image_lock.lock_shared(); int r = m_image_ctx->get_snap_namespace(snap_id, &m_snap_namespace); if (r < 0) { - m_image_ctx->image_lock.put_read(); + m_image_ctx->image_lock.unlock_shared(); derr << "failed to get snap namespace: " << cpp_strerror(r) << dendl; m_ret_val = r; @@ -121,7 +121,7 @@ void SnapshotPurgeRequest::snap_unprotect() { r = m_image_ctx->get_snap_name(snap_id, &m_snap_name); if (r < 0) { - m_image_ctx->image_lock.put_read(); + m_image_ctx->image_lock.unlock_shared(); derr << "failed to get snap name: " << cpp_strerror(r) << dendl; m_ret_val = r; @@ -132,7 +132,7 @@ void SnapshotPurgeRequest::snap_unprotect() { bool is_protected; r = m_image_ctx->is_snap_protected(snap_id, &is_protected); if (r < 0) { - m_image_ctx->image_lock.put_read(); + m_image_ctx->image_lock.unlock_shared(); derr << "failed to get snap protection status: " << cpp_strerror(r) << dendl; @@ -140,7 +140,7 @@ void SnapshotPurgeRequest::snap_unprotect() { close_image(); return; } - m_image_ctx->image_lock.put_read(); + m_image_ctx->image_lock.unlock_shared(); if (!is_protected) { snap_remove(); @@ -163,7 +163,7 @@ void SnapshotPurgeRequest::snap_unprotect() { handle_snap_unprotect(r); finish_op_ctx->complete(0); }); - RWLock::RLocker owner_locker(m_image_ctx->owner_lock); + std::shared_lock owner_locker{m_image_ctx->owner_lock}; m_image_ctx->operations->execute_snap_unprotect( m_snap_namespace, m_snap_name.c_str(), ctx); } @@ -186,7 +186,7 @@ void SnapshotPurgeRequest::handle_snap_unprotect(int r) { { // avoid the need to refresh to delete the newly unprotected snapshot - RWLock::RLocker image_locker(m_image_ctx->image_lock); + std::shared_lock image_locker{m_image_ctx->image_lock}; librados::snap_t snap_id = m_snaps.back(); auto snap_info_it = m_image_ctx->snap_info.find(snap_id); if (snap_info_it != m_image_ctx->snap_info.end()) { @@ -218,7 +218,7 @@ void SnapshotPurgeRequest::snap_remove() { handle_snap_remove(r); finish_op_ctx->complete(0); }); - RWLock::RLocker owner_locker(m_image_ctx->owner_lock); + std::shared_lock owner_locker{m_image_ctx->owner_lock}; m_image_ctx->operations->execute_snap_remove( m_snap_namespace, m_snap_name.c_str(), ctx); } @@ -279,7 +279,7 @@ void SnapshotPurgeRequest::finish(int r) { template Context *SnapshotPurgeRequest::start_lock_op(int* r) { - RWLock::RLocker owner_locker(m_image_ctx->owner_lock); + std::shared_lock owner_locker{m_image_ctx->owner_lock}; return m_image_ctx->exclusive_lock->start_op(r); } diff --git a/src/tools/rbd_mirror/image_deleter/TrashMoveRequest.cc b/src/tools/rbd_mirror/image_deleter/TrashMoveRequest.cc index 857c744c687..82f9bea600b 100644 --- a/src/tools/rbd_mirror/image_deleter/TrashMoveRequest.cc +++ b/src/tools/rbd_mirror/image_deleter/TrashMoveRequest.cc @@ -184,7 +184,7 @@ void TrashMoveRequest::open_image() { { // don't attempt to open the journal - RWLock::WLocker image_locker(m_image_ctx->image_lock); + std::unique_lock image_locker{m_image_ctx->image_lock}; m_image_ctx->set_journal_policy(new JournalPolicy()); } @@ -217,10 +217,10 @@ void TrashMoveRequest::handle_open_image(int r) { template void TrashMoveRequest::acquire_lock() { - m_image_ctx->owner_lock.get_read(); + m_image_ctx->owner_lock.lock_shared(); if (m_image_ctx->exclusive_lock == nullptr) { derr << "exclusive lock feature not enabled" << dendl; - m_image_ctx->owner_lock.put_read(); + m_image_ctx->owner_lock.unlock_shared(); m_ret_val = -EINVAL; close_image(); return; @@ -232,7 +232,7 @@ void TrashMoveRequest::acquire_lock() { TrashMoveRequest, &TrashMoveRequest::handle_acquire_lock>(this); m_image_ctx->exclusive_lock->block_requests(0); m_image_ctx->exclusive_lock->acquire_lock(ctx); - m_image_ctx->owner_lock.put_read(); + m_image_ctx->owner_lock.unlock_shared(); } template diff --git a/src/tools/rbd_mirror/image_deleter/TrashWatcher.cc b/src/tools/rbd_mirror/image_deleter/TrashWatcher.cc index 8735dfb7df5..6d551b56b20 100644 --- a/src/tools/rbd_mirror/image_deleter/TrashWatcher.cc +++ b/src/tools/rbd_mirror/image_deleter/TrashWatcher.cc @@ -36,8 +36,8 @@ TrashWatcher::TrashWatcher(librados::IoCtx &io_ctx, Threads *threads, TrashListener& trash_listener) : librbd::TrashWatcher(io_ctx, threads->work_queue), m_io_ctx(io_ctx), m_threads(threads), m_trash_listener(trash_listener), - m_lock(librbd::util::unique_lock_name( - "rbd::mirror::image_deleter::TrashWatcher", this)) { + m_lock(ceph::make_mutex(librbd::util::unique_lock_name( + "rbd::mirror::image_deleter::TrashWatcher", this))) { } template @@ -45,7 +45,7 @@ void TrashWatcher::init(Context *on_finish) { dout(5) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_on_init_finish = on_finish; ceph_assert(!m_trash_list_in_progress); @@ -60,8 +60,7 @@ void TrashWatcher::shut_down(Context *on_finish) { dout(5) << dendl; { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; ceph_assert(!m_shutting_down); m_shutting_down = true; @@ -82,7 +81,7 @@ void TrashWatcher::handle_image_added(const std::string &image_id, const cls::rbd::TrashImageSpec& spec) { dout(10) << "image_id=" << image_id << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; add_image(image_id, spec); } @@ -111,7 +110,7 @@ template void TrashWatcher::create_trash() { dout(20) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); } @@ -130,7 +129,7 @@ template void TrashWatcher::handle_create_trash(int r) { dout(20) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); } @@ -142,13 +141,13 @@ void TrashWatcher::handle_create_trash(int r) { dout(0) << "detected pool no longer exists" << dendl; } - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_init_finish, m_on_init_finish); m_trash_list_in_progress = false; } else if (r < 0 && r != -EEXIST) { derr << "failed to create trash object: " << cpp_strerror(r) << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_trash_list_in_progress = false; } @@ -166,7 +165,7 @@ void TrashWatcher::handle_create_trash(int r) { template void TrashWatcher::register_watcher() { { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); } @@ -191,7 +190,7 @@ void TrashWatcher::handle_register_watcher(int r) { dout(5) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); if (r < 0) { m_trash_list_in_progress = false; @@ -204,7 +203,7 @@ void TrashWatcher::handle_register_watcher(int r) { } else if (r == -EBLACKLISTED) { dout(0) << "detected client is blacklisted" << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(on_init_finish, m_on_init_finish); } else { derr << "unexpected error registering trash directory watch: " @@ -250,7 +249,7 @@ void TrashWatcher::trash_list(bool initial_request) { dout(5) << "last_image_id=" << m_last_image_id << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); } @@ -277,7 +276,7 @@ void TrashWatcher::handle_trash_list(int r) { Context *on_init_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_trash_list_in_progress); if (r >= 0) { for (auto& image : images) { @@ -316,8 +315,7 @@ void TrashWatcher::handle_trash_list(int r) { template void TrashWatcher::schedule_trash_list(double interval) { - Mutex::Locker timer_locker(m_threads->timer_lock); - Mutex::Locker locker(m_lock); + std::scoped_lock locker{m_threads->timer_lock, m_lock}; if (m_shutting_down || m_trash_list_in_progress || m_timer_ctx != nullptr) { if (m_trash_list_in_progress && !m_deferred_trash_list) { dout(5) << "deferring refresh until in-flight refresh completes" << dendl; @@ -338,12 +336,12 @@ template void TrashWatcher::process_trash_list() { dout(5) << dendl; - ceph_assert(m_threads->timer_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock)); ceph_assert(m_timer_ctx != nullptr); m_timer_ctx = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(!m_trash_list_in_progress); m_trash_list_in_progress = true; } @@ -364,14 +362,15 @@ void TrashWatcher::add_image(const std::string& image_id, return; } - ceph_assert(m_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_lock)); auto& deferment_end_time = spec.deferment_end_time; dout(10) << "image_id=" << image_id << ", " << "deferment_end_time=" << deferment_end_time << dendl; m_async_op_tracker.start_op(); auto ctx = new FunctionContext([this, image_id, deferment_end_time](int r) { - m_trash_listener.handle_trash_image(image_id, deferment_end_time); + m_trash_listener.handle_trash_image(image_id, + deferment_end_time.to_real_time()); m_async_op_tracker.finish_op(); }); m_threads->work_queue->queue(ctx, 0); diff --git a/src/tools/rbd_mirror/image_deleter/TrashWatcher.h b/src/tools/rbd_mirror/image_deleter/TrashWatcher.h index b6f698331aa..e818a102c92 100644 --- a/src/tools/rbd_mirror/image_deleter/TrashWatcher.h +++ b/src/tools/rbd_mirror/image_deleter/TrashWatcher.h @@ -6,7 +6,7 @@ #include "include/rados/librados.hpp" #include "common/AsyncOpTracker.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "librbd/TrashWatcher.h" #include #include @@ -97,7 +97,7 @@ private: std::string m_last_image_id; bufferlist m_out_bl; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; Context *m_on_init_finish = nullptr; Context *m_timer_ctx = nullptr; diff --git a/src/tools/rbd_mirror/image_deleter/Types.h b/src/tools/rbd_mirror/image_deleter/Types.h index ac3bc64afb7..1c70b7e14dd 100644 --- a/src/tools/rbd_mirror/image_deleter/Types.h +++ b/src/tools/rbd_mirror/image_deleter/Types.h @@ -30,7 +30,7 @@ struct TrashListener { } virtual void handle_trash_image(const std::string& image_id, - const utime_t& deferment_end_time) = 0; + const ceph::real_clock::time_point& deferment_end_time) = 0; }; diff --git a/src/tools/rbd_mirror/image_map/Policy.cc b/src/tools/rbd_mirror/image_map/Policy.cc index 6fababddb60..ccf188742a1 100644 --- a/src/tools/rbd_mirror/image_map/Policy.cc +++ b/src/tools/rbd_mirror/image_map/Policy.cc @@ -39,8 +39,8 @@ using librbd::util::unique_lock_name; Policy::Policy(librados::IoCtx &ioctx) : m_ioctx(ioctx), - m_map_lock(unique_lock_name("rbd::mirror::image_map::Policy::m_map_lock", - this)) { + m_map_lock(ceph::make_shared_mutex( + unique_lock_name("rbd::mirror::image_map::Policy::m_map_lock", this))) { // map should at least have once instance std::string instance_id = stringify(ioctx.get_instance_id()); @@ -51,7 +51,7 @@ void Policy::init( const std::map &image_mapping) { dout(20) << dendl; - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; for (auto& it : image_mapping) { ceph_assert(!it.second.instance_id.empty()); auto map_result = m_map[it.second.instance_id].emplace(it.first); @@ -72,7 +72,7 @@ void Policy::init( LookupInfo Policy::lookup(const std::string &global_image_id) { dout(20) << "global_image_id=" << global_image_id << dendl; - RWLock::RLocker map_lock(m_map_lock); + std::shared_lock map_lock{m_map_lock}; LookupInfo info; auto it = m_image_states.find(global_image_id); @@ -86,7 +86,7 @@ LookupInfo Policy::lookup(const std::string &global_image_id) { bool Policy::add_image(const std::string &global_image_id) { dout(5) << "global_image_id=" << global_image_id << dendl; - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; auto image_state_result = m_image_states.emplace(global_image_id, ImageState{}); auto& image_state = image_state_result.first->second; @@ -101,7 +101,7 @@ bool Policy::add_image(const std::string &global_image_id) { bool Policy::remove_image(const std::string &global_image_id) { dout(5) << "global_image_id=" << global_image_id << dendl; - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; auto it = m_image_states.find(global_image_id); if (it == m_image_states.end()) { return false; @@ -115,7 +115,7 @@ void Policy::add_instances(const InstanceIds &instance_ids, GlobalImageIds* global_image_ids) { dout(5) << "instance_ids=" << instance_ids << dendl; - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; for (auto& instance : instance_ids) { ceph_assert(!instance.empty()); m_map.emplace(instance, std::set{}); @@ -157,14 +157,14 @@ void Policy::add_instances(const InstanceIds &instance_ids, void Policy::remove_instances(const InstanceIds &instance_ids, GlobalImageIds* global_image_ids) { - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; remove_instances(m_map_lock, instance_ids, global_image_ids); } -void Policy::remove_instances(const RWLock& lock, +void Policy::remove_instances(const ceph::shared_mutex& lock, const InstanceIds &instance_ids, GlobalImageIds* global_image_ids) { - ceph_assert(m_map_lock.is_wlocked()); + ceph_assert(ceph_mutex_is_wlocked(m_map_lock)); dout(5) << "instance_ids=" << instance_ids << dendl; for (auto& instance_id : instance_ids) { @@ -201,7 +201,7 @@ void Policy::remove_instances(const RWLock& lock, } ActionType Policy::start_action(const std::string &global_image_id) { - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; auto it = m_image_states.find(global_image_id); ceph_assert(it != m_image_states.end()); @@ -222,7 +222,7 @@ ActionType Policy::start_action(const std::string &global_image_id) { } bool Policy::finish_action(const std::string &global_image_id, int r) { - RWLock::WLocker map_lock(m_map_lock); + std::unique_lock map_lock{m_map_lock}; auto it = m_image_states.find(global_image_id); ceph_assert(it != m_image_states.end()); @@ -291,7 +291,7 @@ void Policy::execute_policy_action( } void Policy::map(const std::string& global_image_id, ImageState* image_state) { - ceph_assert(m_map_lock.is_wlocked()); + ceph_assert(ceph_mutex_is_wlocked(m_map_lock)); std::string instance_id = image_state->instance_id; if (instance_id != UNMAPPED_INSTANCE_ID && !is_dead_instance(instance_id)) { @@ -315,7 +315,7 @@ void Policy::map(const std::string& global_image_id, ImageState* image_state) { void Policy::unmap(const std::string &global_image_id, ImageState* image_state) { - ceph_assert(m_map_lock.is_wlocked()); + ceph_assert(ceph_mutex_is_wlocked(m_map_lock)); std::string instance_id = image_state->instance_id; if (instance_id == UNMAPPED_INSTANCE_ID) { @@ -338,7 +338,7 @@ void Policy::unmap(const std::string &global_image_id, } bool Policy::is_image_shuffling(const std::string &global_image_id) { - ceph_assert(m_map_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_map_lock)); auto it = m_image_states.find(global_image_id); ceph_assert(it != m_image_states.end()); @@ -353,7 +353,7 @@ bool Policy::is_image_shuffling(const std::string &global_image_id) { } bool Policy::can_shuffle_image(const std::string &global_image_id) { - ceph_assert(m_map_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_map_lock)); CephContext *cct = reinterpret_cast(m_ioctx.cct()); int migration_throttle = cct->_conf.get_val( diff --git a/src/tools/rbd_mirror/image_map/Policy.h b/src/tools/rbd_mirror/image_map/Policy.h index 590fdbfed88..0617bb9eef0 100644 --- a/src/tools/rbd_mirror/image_map/Policy.h +++ b/src/tools/rbd_mirror/image_map/Policy.h @@ -51,7 +51,7 @@ protected: typedef std::map > InstanceToImageMap; bool is_dead_instance(const std::string instance_id) { - ceph_assert(m_map_lock.is_locked()); + ceph_assert(ceph_mutex_is_locked(m_map_lock)); return m_dead_instances.find(instance_id) != m_dead_instances.end(); } @@ -89,7 +89,7 @@ private: librados::IoCtx &m_ioctx; - RWLock m_map_lock; // protects m_map + ceph::shared_mutex m_map_lock; // protects m_map InstanceToImageMap m_map; // instance_id -> global_id map ImageStates m_image_states; @@ -97,7 +97,8 @@ private: bool m_initial_update = true; - void remove_instances(const RWLock& lock, const InstanceIds &instance_ids, + void remove_instances(const ceph::shared_mutex& lock, + const InstanceIds &instance_ids, GlobalImageIds* global_image_ids); bool set_state(ImageState* image_state, StateTransition::State state, diff --git a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc index a97cc0fcd33..dd66f9d964a 100644 --- a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc +++ b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.cc @@ -66,7 +66,7 @@ BootstrapRequest::BootstrapRequest( m_remote_mirror_uuid(remote_mirror_uuid), m_journaler(journaler), m_client_state(client_state), m_client_meta(client_meta), m_progress_ctx(progress_ctx), m_do_resync(do_resync), - m_lock(unique_lock_name("BootstrapRequest::m_lock", this)) { + m_lock(ceph::make_mutex(unique_lock_name("BootstrapRequest::m_lock", this))) { dout(10) << dendl; } @@ -77,7 +77,7 @@ BootstrapRequest::~BootstrapRequest() { template bool BootstrapRequest::is_syncing() const { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; return (m_image_sync != nullptr); } @@ -92,7 +92,7 @@ template void BootstrapRequest::cancel() { dout(10) << dendl; - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_canceled = true; if (m_image_sync != nullptr) { @@ -311,9 +311,9 @@ void BootstrapRequest::handle_open_local_image(int r) { I *local_image_ctx = (*m_local_image_ctx); { - local_image_ctx->image_lock.get_read(); + local_image_ctx->image_lock.lock_shared(); if (local_image_ctx->journal == nullptr) { - local_image_ctx->image_lock.put_read(); + local_image_ctx->image_lock.unlock_shared(); derr << "local image does not support journaling" << dendl; m_ret_val = -EINVAL; @@ -323,7 +323,7 @@ void BootstrapRequest::handle_open_local_image(int r) { r = (*m_local_image_ctx)->journal->is_resync_requested(m_do_resync); if (r < 0) { - local_image_ctx->image_lock.put_read(); + local_image_ctx->image_lock.unlock_shared(); derr << "failed to check if a resync was requested" << dendl; m_ret_val = r; @@ -335,7 +335,7 @@ void BootstrapRequest::handle_open_local_image(int r) { m_local_tag_data = local_image_ctx->journal->get_tag_data(); dout(10) << "local tag=" << m_local_tag_tid << ", " << "local tag data=" << m_local_tag_data << dendl; - local_image_ctx->image_lock.put_read(); + local_image_ctx->image_lock.unlock_shared(); } if (m_local_tag_data.mirror_uuid != m_remote_mirror_uuid && !m_primary) { @@ -481,9 +481,9 @@ void BootstrapRequest::create_local_image() { dout(15) << "local_image_id=" << m_local_image_id << dendl; update_progress("CREATE_LOCAL_IMAGE"); - m_remote_image_ctx->image_lock.get_read(); + m_remote_image_ctx->image_lock.lock_shared(); std::string image_name = m_remote_image_ctx->name; - m_remote_image_ctx->image_lock.put_read(); + m_remote_image_ctx->image_lock.unlock_shared(); Context *ctx = create_context_callback< BootstrapRequest, &BootstrapRequest::handle_create_local_image>( @@ -664,7 +664,7 @@ void BootstrapRequest::image_sync() { } { - Mutex::Locker locker(m_lock); + std::unique_lock locker{m_lock}; if (m_canceled) { m_ret_val = -ECANCELED; } else { @@ -681,9 +681,9 @@ void BootstrapRequest::image_sync() { m_image_sync->get(); - m_lock.Unlock(); + locker.unlock(); update_progress("IMAGE_SYNC"); - m_lock.Lock(); + locker.lock(); m_image_sync->send(); return; @@ -699,7 +699,7 @@ void BootstrapRequest::handle_image_sync(int r) { dout(15) << "r=" << r << dendl; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; m_image_sync->put(); m_image_sync = nullptr; diff --git a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h index ea9f856523f..cc55040a825 100644 --- a/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h +++ b/src/tools/rbd_mirror/image_replayer/BootstrapRequest.h @@ -6,7 +6,7 @@ #include "include/int_types.h" #include "include/rados/librados.hpp" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "cls/journal/cls_journal_types.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" @@ -17,7 +17,6 @@ class Context; class ContextWQ; -class Mutex; class SafeTimer; namespace journal { class Journaler; } namespace librbd { class ImageCtx; } @@ -163,7 +162,7 @@ private: ProgressContext *m_progress_ctx; bool *m_do_resync; - mutable Mutex m_lock; + mutable ceph::mutex m_lock; bool m_canceled = false; Tags m_remote_tags; diff --git a/src/tools/rbd_mirror/image_replayer/CreateImageRequest.cc b/src/tools/rbd_mirror/image_replayer/CreateImageRequest.cc index e54a76e0733..034f24f77df 100644 --- a/src/tools/rbd_mirror/image_replayer/CreateImageRequest.cc +++ b/src/tools/rbd_mirror/image_replayer/CreateImageRequest.cc @@ -73,7 +73,7 @@ void CreateImageRequest::create_image() { Context *ctx = create_context_callback< klass, &klass::handle_create_image>(this); - RWLock::RLocker image_locker(m_remote_image_ctx->image_lock); + std::shared_lock image_locker{m_remote_image_ctx->image_lock}; auto& config{ reinterpret_cast(m_local_io_ctx.cct())->_conf}; @@ -323,7 +323,7 @@ void CreateImageRequest::clone_image() { std::string snap_name; cls::rbd::SnapshotNamespace snap_namespace; { - RWLock::RLocker remote_image_locker(m_remote_parent_image_ctx->image_lock); + std::shared_lock remote_image_locker(m_remote_parent_image_ctx->image_lock); auto it = m_remote_parent_image_ctx->snap_info.find( m_remote_parent_spec.snap_id); if (it != m_remote_parent_image_ctx->snap_info.end()) { @@ -406,8 +406,8 @@ void CreateImageRequest::finish(int r) { template int CreateImageRequest::validate_parent() { - RWLock::RLocker owner_locker(m_remote_image_ctx->owner_lock); - RWLock::RLocker image_locker(m_remote_image_ctx->image_lock); + std::shared_lock owner_locker{m_remote_image_ctx->owner_lock}; + std::shared_lock image_locker{m_remote_image_ctx->image_lock}; m_remote_parent_spec = m_remote_image_ctx->parent_md.spec; diff --git a/src/tools/rbd_mirror/image_replayer/EventPreprocessor.cc b/src/tools/rbd_mirror/image_replayer/EventPreprocessor.cc index aebf1c3d95d..9034ca4f642 100644 --- a/src/tools/rbd_mirror/image_replayer/EventPreprocessor.cc +++ b/src/tools/rbd_mirror/image_replayer/EventPreprocessor.cc @@ -174,7 +174,7 @@ template bool EventPreprocessor::prune_snap_map(SnapSeqs *snap_seqs) { bool pruned = false; - RWLock::RLocker image_locker(m_local_image_ctx.image_lock); + std::shared_lock image_locker{m_local_image_ctx.image_lock}; for (auto it = snap_seqs->begin(); it != snap_seqs->end(); ) { auto current_it(it++); if (m_local_image_ctx.snap_info.count(current_it->second) == 0) { diff --git a/src/tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc b/src/tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc index 35ca37a896b..cc415efa2df 100644 --- a/src/tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc +++ b/src/tools/rbd_mirror/image_replayer/OpenLocalImageRequest.cc @@ -45,8 +45,8 @@ struct MirrorExclusiveLockPolicy : public librbd::exclusive_lock::Policy { int lock_requested(bool force) override { int r = -EROFS; { - RWLock::RLocker owner_locker(image_ctx->owner_lock); - RWLock::RLocker image_locker(image_ctx->image_lock); + std::shared_lock owner_locker{image_ctx->owner_lock}; + std::shared_lock image_locker{image_ctx->image_lock}; if (image_ctx->journal == nullptr || image_ctx->journal->is_tag_owner()) { r = 0; } @@ -107,8 +107,8 @@ void OpenLocalImageRequest::send_open_image() { *m_local_image_ctx = I::create("", m_local_image_id, nullptr, m_local_io_ctx, false); { - RWLock::WLocker owner_locker((*m_local_image_ctx)->owner_lock); - RWLock::WLocker image_locker((*m_local_image_ctx)->image_lock); + std::scoped_lock locker{(*m_local_image_ctx)->owner_lock, + (*m_local_image_ctx)->image_lock}; (*m_local_image_ctx)->set_exclusive_lock_policy( new MirrorExclusiveLockPolicy(*m_local_image_ctx)); (*m_local_image_ctx)->set_journal_policy( @@ -183,7 +183,7 @@ template void OpenLocalImageRequest::send_lock_image() { dout(20) << dendl; - RWLock::RLocker owner_locker((*m_local_image_ctx)->owner_lock); + std::shared_lock owner_locker{(*m_local_image_ctx)->owner_lock}; if ((*m_local_image_ctx)->exclusive_lock == nullptr) { derr << ": image does not support exclusive lock" << dendl; send_close_image(-EINVAL); @@ -212,7 +212,7 @@ void OpenLocalImageRequest::handle_lock_image(int r) { } { - RWLock::RLocker owner_locker((*m_local_image_ctx)->owner_lock); + std::shared_lock owner_locker{(*m_local_image_ctx)->owner_lock}; if ((*m_local_image_ctx)->exclusive_lock == nullptr || !(*m_local_image_ctx)->exclusive_lock->is_lock_owner()) { derr << ": image is not locked" << dendl; diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc index f514d7495ef..b0e90735933 100644 --- a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc +++ b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.cc @@ -27,7 +27,7 @@ ReplayStatusFormatter::ReplayStatusFormatter(Journaler *journaler, const std::string &mirror_uuid) : m_journaler(journaler), m_mirror_uuid(mirror_uuid), - m_lock(unique_lock_name("ReplayStatusFormatter::m_lock", this)) { + m_lock(ceph::make_mutex(unique_lock_name("ReplayStatusFormatter::m_lock", this))) { } template @@ -37,7 +37,7 @@ bool ReplayStatusFormatter::get_or_send_update(std::string *description, bool in_progress = false; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; if (m_on_finish) { in_progress = true; } else { @@ -88,7 +88,7 @@ bool ReplayStatusFormatter::get_or_send_update(std::string *description, format(description); { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; ceph_assert(m_on_finish == on_finish); m_on_finish = nullptr; } @@ -158,7 +158,7 @@ void ReplayStatusFormatter::send_update_tag_cache(uint64_t master_tag_tid, m_tag_cache.find(master_tag_tid) != m_tag_cache.end()) { Context *on_finish = nullptr; { - Mutex::Locker locker(m_lock); + std::lock_guard locker{m_lock}; std::swap(m_on_finish, on_finish); } diff --git a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h index 59940a651fb..f1e9bf5656b 100644 --- a/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h +++ b/src/tools/rbd_mirror/image_replayer/ReplayStatusFormatter.h @@ -5,7 +5,7 @@ #define RBD_MIRROR_IMAGE_REPLAYER_REPLAY_STATUS_FORMATTER_H #include "include/Context.h" -#include "common/Mutex.h" +#include "common/ceph_mutex.h" #include "cls/journal/cls_journal_types.h" #include "librbd/journal/Types.h" #include "librbd/journal/TypeTraits.h" @@ -38,7 +38,7 @@ public: private: Journaler *m_journaler; std::string m_mirror_uuid; - Mutex m_lock; + ceph::mutex m_lock; Context *m_on_finish = nullptr; cls::journal::ObjectPosition m_master_position; cls::journal::ObjectPosition m_mirror_position; diff --git a/src/tools/rbd_mirror/image_sync/SyncPointPruneRequest.cc b/src/tools/rbd_mirror/image_sync/SyncPointPruneRequest.cc index 3da2987271e..73e17abb79f 100644 --- a/src/tools/rbd_mirror/image_sync/SyncPointPruneRequest.cc +++ b/src/tools/rbd_mirror/image_sync/SyncPointPruneRequest.cc @@ -57,7 +57,7 @@ void SyncPointPruneRequest::send() { } else { // if we have more than one sync point or invalid sync points, // trim them off - RWLock::RLocker image_locker(m_remote_image_ctx->image_lock); + std::shared_lock image_locker{m_remote_image_ctx->image_lock}; std::set snap_names; for (auto it = m_client_meta_copy.sync_points.rbegin(); it != m_client_meta_copy.sync_points.rend(); ++it) { -- 2.39.5