From 9f0f8f3c629e96cb39f94ab38407c9022d5fa3d4 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 23 May 2018 16:35:42 -0400 Subject: [PATCH] common: SharedMutex uses ref-counted implementation add a reference-counted SharedMutexImpl so that lock guards can outlive the SharedMutex itself. this is required because the lock guards are passed with async completions, and there is no guarantee that the executor will process those completions before the SharedMutex destructs. this case is exercised by the async_destruct unit test Fixes: http://tracker.ceph.com/issues/24124 Signed-off-by: Casey Bodley --- src/common/async/detail/shared_lock.h | 185 ++++++++++++++ src/common/async/detail/shared_mutex.h | 323 +++++++++++++++++++++++++ src/common/async/shared_mutex.h | 278 +++------------------ 3 files changed, 539 insertions(+), 247 deletions(-) create mode 100644 src/common/async/detail/shared_lock.h create mode 100644 src/common/async/detail/shared_mutex.h diff --git a/src/common/async/detail/shared_lock.h b/src/common/async/detail/shared_lock.h new file mode 100644 index 00000000000..12e6a9220ec --- /dev/null +++ b/src/common/async/detail/shared_lock.h @@ -0,0 +1,185 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +namespace std { + +// specialize unique_lock and shared_lock for SharedMutex to operate on +// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself + +template +class unique_lock> { + public: + using mutex_type = boost::intrusive_ptr; + + unique_lock() = default; + explicit unique_lock(ceph::async::SharedMutex& m) + : impl(m.impl), locked(true) + { + impl->lock(); + } + unique_lock(ceph::async::SharedMutex& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + unique_lock(ceph::async::SharedMutex& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock()) + {} + unique_lock(ceph::async::SharedMutex& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + ~unique_lock() { + if (impl && locked) + impl->unlock(); + } + + unique_lock(unique_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + unique_lock& operator=(unique_lock&& other) noexcept { + if (impl && locked) { + impl->unlock(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(unique_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +template +class shared_lock> { + public: + using mutex_type = boost::intrusive_ptr; + + shared_lock() = default; + explicit shared_lock(ceph::async::SharedMutex& m) + : impl(m.impl), locked(true) + { + impl->lock_shared(); + } + shared_lock(ceph::async::SharedMutex& m, defer_lock_t t) noexcept + : impl(m.impl) + {} + shared_lock(ceph::async::SharedMutex& m, try_to_lock_t t) + : impl(m.impl), locked(impl->try_lock_shared()) + {} + shared_lock(ceph::async::SharedMutex& m, adopt_lock_t t) noexcept + : impl(m.impl), locked(true) + {} + + ~shared_lock() { + if (impl && locked) + impl->unlock_shared(); + } + + shared_lock(shared_lock&& other) noexcept + : impl(std::move(other.impl)), + locked(other.locked) { + other.locked = false; + } + shared_lock& operator=(shared_lock&& other) noexcept { + if (impl && locked) { + impl->unlock_shared(); + } + impl = std::move(other.impl); + locked = other.locked; + other.locked = false; + return *this; + } + void swap(shared_lock& other) noexcept { + using std::swap; + swap(impl, other.impl); + swap(locked, other.locked); + } + + mutex_type mutex() const noexcept { return impl; } + bool owns_lock() const noexcept { return impl && locked; } + explicit operator bool() const noexcept { return impl && locked; } + + mutex_type release() { + auto result = std::move(impl); + locked = false; + return result; + } + + void lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + impl->lock_shared(); + locked = true; + } + bool try_lock() { + if (!impl) + throw system_error(make_error_code(errc::operation_not_permitted)); + if (locked) + throw system_error(make_error_code(errc::resource_deadlock_would_occur)); + return locked = impl->try_lock_shared(); + } + void unlock() { + if (!impl || !locked) + throw system_error(make_error_code(errc::operation_not_permitted)); + impl->unlock_shared(); + locked = false; + } + private: + mutex_type impl; + bool locked{false}; +}; + +} // namespace std diff --git a/src/common/async/detail/shared_mutex.h b/src/common/async/detail/shared_mutex.h new file mode 100644 index 00000000000..c44142d6610 --- /dev/null +++ b/src/common/async/detail/shared_mutex.h @@ -0,0 +1,323 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include // for std::shared_lock + +#include +#include +#include + +#include "common/async/completion.h" + +namespace ceph::async::detail { + +struct LockRequest : public boost::intrusive::list_base_hook<> { + virtual ~LockRequest() {} + virtual void complete(boost::system::error_code ec) = 0; + virtual void destroy() = 0; +}; + +class SharedMutexImpl : public boost::intrusive_ref_counter { + public: + ~SharedMutexImpl(); + + template + auto async_lock(Mutex& mtx, CompletionToken&& token); + void lock(); + void lock(boost::system::error_code& ec); + bool try_lock(); + void unlock(); + template + auto async_lock_shared(Mutex& mtx, CompletionToken&& token); + void lock_shared(); + void lock_shared(boost::system::error_code& ec); + bool try_lock_shared(); + void unlock_shared(); + void cancel(); + + private: + using RequestList = boost::intrusive::list; + + RequestList shared_queue; //< requests waiting on a shared lock + RequestList exclusive_queue; //< requests waiting on an exclusive lock + + /// lock state encodes the number of shared lockers, or 'max' for exclusive + using LockState = uint16_t; + static constexpr LockState Unlocked = 0; + static constexpr LockState Exclusive = std::numeric_limits::max(); + static constexpr LockState MaxShared = Exclusive - 1; + LockState state = Unlocked; //< current lock state + + std::mutex mutex; //< protects lock state and wait queues + + void complete(RequestList&& requests, boost::system::error_code ec); +}; + +// sync requests live on the stack and wait on a condition variable +class SyncRequest : public LockRequest { + std::condition_variable cond; + std::optional ec; + public: + boost::system::error_code wait(std::unique_lock& lock) { + // return the error code once its been set + cond.wait(lock, [this] { return ec; }); + return *ec; + } + void complete(boost::system::error_code ec) override { + this->ec = ec; + cond.notify_one(); + } + void destroy() override { + // nothing, SyncRequests live on the stack + } +}; + +// async requests use async::Completion to invoke a handler on its executor +template typename Lock> +class AsyncRequest : public LockRequest { + Mutex& mutex; //< mutex argument for lock guard + public: + explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {} + + using Signature = void(boost::system::error_code, Lock); + using LockCompletion = Completion>; + + void complete(boost::system::error_code ec) override { + auto r = static_cast(this); + // pass ownership of ourselves to post(). on error, pass an empty lock + post(std::unique_ptr{r}, ec, + ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock}); + } + void destroy() override { + delete static_cast(this); + } +}; + +inline SharedMutexImpl::~SharedMutexImpl() +{ + assert(state == Unlocked); + assert(shared_queue.empty()); + assert(exclusive_queue.empty()); +} + +template +auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest; + using Signature = typename Request::Signature; + boost::asio::async_completion init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (state == Unlocked) { + state = Exclusive; + + // post a successful completion + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::unique_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + // create a request and add it to the exclusive list + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + exclusive_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock() +{ + boost::system::error_code ec; + lock(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + ec.clear(); + } else { + SyncRequest request; + exclusive_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock() +{ + std::lock_guard lock{mutex}; + + if (state == Unlocked) { + state = Exclusive; + return true; + } + return false; +} + +void SharedMutexImpl::unlock() +{ + RequestList granted; + { + std::lock_guard lock{mutex}; + assert(state == Exclusive); + + if (!exclusive_queue.empty()) { + // grant next exclusive lock + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + granted.push_back(request); + } else { + // grant shared locks, if any + state = shared_queue.size(); + if (state > MaxShared) { + state = MaxShared; + auto end = std::next(shared_queue.begin(), MaxShared); + granted.splice(granted.end(), shared_queue, + shared_queue.begin(), end, MaxShared); + } else { + granted.splice(granted.end(), shared_queue); + } + } + } + complete(std::move(granted), boost::system::error_code{}); +} + +template +auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token) +{ + using Request = AsyncRequest; + using Signature = typename Request::Signature; + boost::asio::async_completion init(token); + auto& handler = init.completion_handler; + auto ex1 = mtx.get_executor(); + { + std::lock_guard lock{mutex}; + + boost::system::error_code ec; + if (exclusive_queue.empty() && state < MaxShared) { + state++; + + auto ex2 = boost::asio::get_associated_executor(handler, ex1); + auto alloc2 = boost::asio::get_associated_allocator(handler); + auto b = bind_handler(std::move(handler), ec, + std::shared_lock{mtx, std::adopt_lock}); + ex2.post(forward_handler(std::move(b)), alloc2); + } else { + using LockCompletion = typename Request::LockCompletion; + auto request = LockCompletion::create(ex1, std::move(handler), mtx); + shared_queue.push_back(*request.release()); + } + } + return init.result.get(); +} + +inline void SharedMutexImpl::lock_shared() +{ + boost::system::error_code ec; + lock_shared(ec); + if (ec) { + throw boost::system::system_error(ec); + } +} + +void SharedMutexImpl::lock_shared(boost::system::error_code& ec) +{ + std::unique_lock lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + ec.clear(); + } else { + SyncRequest request; + shared_queue.push_back(request); + ec = request.wait(lock); + } +} + +inline bool SharedMutexImpl::try_lock_shared() +{ + std::lock_guard lock{mutex}; + + if (exclusive_queue.empty() && state < MaxShared) { + state++; + return true; + } + return false; +} + +inline void SharedMutexImpl::unlock_shared() +{ + std::lock_guard lock{mutex}; + assert(state != Unlocked && state <= MaxShared); + + if (state == 1 && !exclusive_queue.empty()) { + // grant next exclusive lock + state = Exclusive; + auto& request = exclusive_queue.front(); + exclusive_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else if (state == MaxShared && !shared_queue.empty() && + exclusive_queue.empty()) { + // grant next shared lock + auto& request = shared_queue.front(); + shared_queue.pop_front(); + request.complete(boost::system::error_code{}); + } else { + state--; + } +} + +inline void SharedMutexImpl::cancel() +{ + RequestList canceled; + { + std::lock_guard lock{mutex}; + canceled.splice(canceled.end(), shared_queue); + canceled.splice(canceled.end(), exclusive_queue); + } + complete(std::move(canceled), boost::asio::error::operation_aborted); +} + +void SharedMutexImpl::complete(RequestList&& requests, + boost::system::error_code ec) +{ + while (!requests.empty()) { + auto& request = requests.front(); + requests.pop_front(); + try { + request.complete(ec); + } catch (...) { + // clean up any remaining completions and rethrow + requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); }); + throw; + } + } +} + +} // namespace ceph::async::detail diff --git a/src/common/async/shared_mutex.h b/src/common/async/shared_mutex.h index f71928fd833..3e471a4df6f 100644 --- a/src/common/async/shared_mutex.h +++ b/src/common/async/shared_mutex.h @@ -12,16 +12,9 @@ * */ -#ifndef CEPH_ASYNC_SHARED_MUTEX_H -#define CEPH_ASYNC_SHARED_MUTEX_H +#pragma once -#include -#include -#include // for std::shared_lock - -#include - -#include "common/async/completion.h" +#include "common/async/detail/shared_mutex.h" namespace ceph::async { @@ -62,13 +55,13 @@ namespace ceph::async { template class SharedMutex { public: - SharedMutex(const Executor& ex1); + explicit SharedMutex(const Executor& ex); /// on destruction, all pending lock requests are canceled ~SharedMutex(); using executor_type = Executor; - executor_type get_executor() const noexcept { return ex1; } + executor_type get_executor() const noexcept { return ex; } /// initiate an asynchronous request for an exclusive lock. when the lock is /// granted, the completion handler is invoked with a successful error code @@ -121,92 +114,26 @@ class SharedMutex { void cancel(); private: - Executor ex1; //< default callback executor - - struct LockRequest : public boost::intrusive::list_base_hook<> { - virtual ~LockRequest() {} - virtual void complete(boost::system::error_code ec) = 0; - virtual void destroy() = 0; - }; - using RequestList = boost::intrusive::list; - - RequestList shared_queue; //< requests waiting on a shared lock - RequestList exclusive_queue; //< requests waiting on an exclusive lock - - /// lock state encodes the number of shared lockers, or 'max' for exclusive - using LockState = uint16_t; - static constexpr LockState Unlocked = 0; - static constexpr LockState Exclusive = std::numeric_limits::max(); - static constexpr LockState MaxShared = Exclusive - 1; - LockState state = Unlocked; //< current lock state - - std::mutex mutex; //< protects lock state and wait queues - - // sync requests live on the stack and wait on a condition variable - class SyncRequest; - - // async requests use async::Completion to invoke a handler on its executor - template