+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2018 Red Hat
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_ASYNC_SHARED_MUTEX_H
-#define CEPH_ASYNC_SHARED_MUTEX_H
-
-#include <condition_variable>
-#include <mutex>
-#include <shared_mutex> // for std::shared_lock
-
-#include <boost/intrusive/list.hpp>
-
-#include "common/async/completion.h"
-
-namespace ceph::async {
-
-/**
- * An asynchronous shared mutex for use with boost::asio.
- *
- * A shared mutex class with asynchronous lock operations that complete on a
- * boost::asio executor. The class also has synchronous interfaces that meet
- * most of the standard library's requirements for the SharedMutex concept,
- * which makes it compatible with lock_guard, unique_lock, and shared_lock.
- *
- * All lock requests can fail with operation_aborted on cancel() or destruction.
- * The non-error_code overloads of lock() and lock_shared() will throw this
- * error as an exception of type boost::system::system_error.
- *
- * Exclusive locks are prioritized over shared locks. Locks of the same type
- * are granted in fifo order. The implementation defines a limit on the number
- * of shared locks to 65534 at a time.
- *
- * Example use:
- *
- * boost::asio::io_context context;
- * SharedMutex mutex{context.get_executor()};
- *
- * mutex.async_lock([&] (boost::system::error_code ec, auto lock) {
- * if (!ec) {
- * // mutate shared state ...
- * }
- * });
- * mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) {
- * if (!ec) {
- * // read shared state ...
- * }
- * });
- *
- * context.run();
- */
-template <typename Executor>
-class SharedMutex {
- public:
- SharedMutex(const Executor& ex1);
-
- /// on destruction, all pending lock requests are canceled
- ~SharedMutex();
-
- using executor_type = Executor;
- executor_type get_executor() const noexcept { return ex1; }
-
- /// initiate an asynchronous request for an exclusive lock. when the lock is
- /// granted, the completion handler is invoked with a successful error code
- /// and a std::unique_lock that owns this mutex.
- /// Signature = void(boost::system::error_code, std::unique_lock)
- template <typename CompletionToken>
- auto async_lock(CompletionToken&& token);
-
- /// wait synchronously for an exclusive lock. if an error occurs before the
- /// lock is granted, that error is thrown as an exception
- void lock();
-
- /// wait synchronously for an exclusive lock. if an error occurs before the
- /// lock is granted, that error is assigned to 'ec'
- void lock(boost::system::error_code& ec);
-
- /// try to acquire an exclusive lock. if the lock is not immediately
- /// available, returns false
- bool try_lock();
-
- /// releases an exclusive lock. not required to be called from the same thread
- /// that initiated the lock
- void unlock();
-
- /// initiate an asynchronous request for a shared lock. when the lock is
- /// granted, the completion handler is invoked with a successful error code
- /// and a std::shared_lock that owns this mutex.
- /// Signature = void(boost::system::error_code, std::shared_lock)
- template <typename CompletionToken>
- auto async_lock_shared(CompletionToken&& token);
-
- /// wait synchronously for a shared lock. if an error occurs before the
- /// lock is granted, that error is thrown as an exception
- void lock_shared();
-
- /// wait synchronously for a shared lock. if an error occurs before the lock
- /// is granted, that error is assigned to 'ec'
- void lock_shared(boost::system::error_code& ec);
-
- /// try to acquire a shared lock. if the lock is not immediately available,
- /// returns false
- bool try_lock_shared();
-
- /// releases a shared lock. not required to be called from the same thread
- /// that initiated the lock
- void unlock_shared();
-
- /// cancel any pending requests for exclusive or shared locks with an
- /// operation_aborted error
- void cancel();
-
- private:
- Executor ex1; //< default callback executor
-
- struct LockRequest : public boost::intrusive::list_base_hook<> {
- virtual ~LockRequest() {}
- virtual void complete(boost::system::error_code ec) = 0;
- virtual void destroy() = 0;
- };
- using RequestList = boost::intrusive::list<LockRequest>;
-
- RequestList shared_queue; //< requests waiting on a shared lock
- RequestList exclusive_queue; //< requests waiting on an exclusive lock
-
- /// lock state encodes the number of shared lockers, or 'max' for exclusive
- using LockState = uint16_t;
- static constexpr LockState Unlocked = 0;
- static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
- static constexpr LockState MaxShared = Exclusive - 1;
- LockState state = Unlocked; //< current lock state
-
- std::mutex mutex; //< protects lock state and wait queues
-
- // sync requests live on the stack and wait on a condition variable
- class SyncRequest;
-
- // async requests use async::Completion to invoke a handler on its executor
- template <template <typename Mutex> typename Lock>
- class AsyncRequest;
-
- using AsyncExclusiveRequest = AsyncRequest<std::unique_lock>;
- using AsyncSharedRequest = AsyncRequest<std::shared_lock>;
-
- void complete(RequestList&& requests, boost::system::error_code ec);
-};
-
-template <typename Executor>
-class SharedMutex<Executor>::SyncRequest : public LockRequest {
- std::condition_variable cond;
- std::optional<boost::system::error_code> ec;
- public:
- boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
- // return the error code once its been set
- cond.wait(lock, [this] { return ec; });
- return *ec;
- }
- void complete(boost::system::error_code ec) override {
- this->ec = ec;
- cond.notify_one();
- }
- void destroy() override {
- // nothing, SyncRequests live on the stack
- }
-};
-
-template <typename Executor>
-template <template <typename Mutex> typename Lock>
-class SharedMutex<Executor>::AsyncRequest : public LockRequest {
- SharedMutex& mutex; //< mutex argument for lock guard
- public:
- AsyncRequest(SharedMutex& mutex) : mutex(mutex) {}
-
- using Signature = void(boost::system::error_code, Lock<SharedMutex>);
- using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
-
- void complete(boost::system::error_code ec) override {
- auto r = static_cast<LockCompletion*>(this);
- // pass ownership of ourselves to post(). on error, pass an empty lock
- post(std::unique_ptr<LockCompletion>{r}, ec,
- ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
- }
- void destroy() override {
- delete static_cast<LockCompletion*>(this);
- }
-};
-
-
-template <typename Executor>
-inline SharedMutex<Executor>::SharedMutex(const Executor& ex1)
- : ex1(ex1)
-{
-}
-
-template <typename Executor>
-inline SharedMutex<Executor>::~SharedMutex()
-{
- try {
- cancel();
- } catch (const std::exception&) {
- // swallow any exceptions, the destructor can't throw
- }
-}
-
-template <typename Executor>
-template <typename CompletionToken>
-auto SharedMutex<Executor>::async_lock(CompletionToken&& token)
-{
- using Signature = typename AsyncExclusiveRequest::Signature;
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto& handler = init.completion_handler;
- {
- std::lock_guard lock{mutex};
-
- if (state == Unlocked) {
- state = Exclusive;
-
- // post the completion
- auto ex2 = boost::asio::get_associated_executor(handler, ex1);
- auto alloc2 = boost::asio::get_associated_allocator(handler);
- auto b = bind_handler(std::move(handler), boost::system::error_code{},
- std::unique_lock{*this, std::adopt_lock});
- ex2.post(forward_handler(std::move(b)), alloc2);
- } else {
- // create a request and add it to the exclusive list
- using LockCompletion = typename AsyncExclusiveRequest::LockCompletion;
- auto request = LockCompletion::create(ex1, std::move(handler), *this);
- exclusive_queue.push_back(*request.release());
- }
- }
- return init.result.get();
-}
-
-template <typename Executor>
-inline void SharedMutex<Executor>::lock()
-{
- boost::system::error_code ec;
- lock(ec);
- if (ec) {
- throw boost::system::system_error(ec);
- }
-}
-
-template <typename Executor>
-void SharedMutex<Executor>::lock(boost::system::error_code& ec)
-{
- std::unique_lock lock{mutex};
-
- if (state == Unlocked) {
- state = Exclusive;
- ec.clear();
- } else {
- SyncRequest request;
- exclusive_queue.push_back(request);
- ec = request.wait(lock);
- }
-}
-
-template <typename Executor>
-inline bool SharedMutex<Executor>::try_lock()
-{
- std::lock_guard lock{mutex};
-
- if (state == Unlocked) {
- state = Exclusive;
- return true;
- }
- return false;
-}
-
-template <typename Executor>
-void SharedMutex<Executor>::unlock()
-{
- RequestList granted;
- {
- std::lock_guard lock{mutex};
- assert(state == Exclusive);
-
- if (!exclusive_queue.empty()) {
- // grant next exclusive lock
- auto& request = exclusive_queue.front();
- exclusive_queue.pop_front();
- granted.push_back(request);
- } else {
- // grant shared locks, if any
- state = shared_queue.size();
- if (state > MaxShared) {
- state = MaxShared;
- auto end = std::next(shared_queue.begin(), MaxShared);
- granted.splice(granted.end(), shared_queue,
- shared_queue.begin(), end, MaxShared);
- } else {
- granted.splice(granted.end(), shared_queue);
- }
- }
- }
- complete(std::move(granted), boost::system::error_code{});
-}
-
-template <typename Executor>
-template <typename CompletionToken>
-auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token)
-{
- using Signature = typename AsyncSharedRequest::Signature;
- boost::asio::async_completion<CompletionToken, Signature> init(token);
- auto& handler = init.completion_handler;
- {
- std::lock_guard lock{mutex};
-
- if (exclusive_queue.empty() && state < MaxShared) {
- state++;
-
- auto ex2 = boost::asio::get_associated_executor(handler, ex1);
- auto alloc2 = boost::asio::get_associated_allocator(handler);
- auto b = bind_handler(std::move(handler), boost::system::error_code{},
- std::shared_lock{*this, std::adopt_lock});
- ex2.post(forward_handler(std::move(b)), alloc2);
- } else {
- using LockCompletion = typename AsyncSharedRequest::LockCompletion;
- auto request = LockCompletion::create(ex1, std::move(handler), *this);
- shared_queue.push_back(*request.release());
- }
- }
- return init.result.get();
-}
-
-template <typename Executor>
-inline void SharedMutex<Executor>::lock_shared()
-{
- boost::system::error_code ec;
- lock_shared(ec);
- if (ec) {
- throw boost::system::system_error(ec);
- }
-}
-
-template <typename Executor>
-void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec)
-{
- std::unique_lock lock{mutex};
-
- if (exclusive_queue.empty() && state < MaxShared) {
- state++;
- ec.clear();
- } else {
- SyncRequest request;
- shared_queue.push_back(request);
- ec = request.wait(lock);
- }
-}
-
-template <typename Executor>
-inline bool SharedMutex<Executor>::try_lock_shared()
-{
- std::lock_guard lock{mutex};
-
- if (exclusive_queue.empty() && state < MaxShared) {
- state++;
- return true;
- }
- return false;
-}
-
-template <typename Executor>
-inline void SharedMutex<Executor>::unlock_shared()
-{
- std::lock_guard lock{mutex};
- assert(state != Unlocked && state <= MaxShared);
-
- if (state == 1 && !exclusive_queue.empty()) {
- // grant next exclusive lock
- state = Exclusive;
- auto& request = exclusive_queue.front();
- exclusive_queue.pop_front();
- request.complete(boost::system::error_code{});
- } else if (state == MaxShared && !shared_queue.empty() &&
- exclusive_queue.empty()) {
- // grant next shared lock
- auto& request = shared_queue.front();
- shared_queue.pop_front();
- request.complete(boost::system::error_code{});
- } else {
- state--;
- }
-}
-
-template <typename Executor>
-inline void SharedMutex<Executor>::cancel()
-{
- RequestList canceled;
- {
- std::lock_guard lock{mutex};
- canceled.splice(canceled.end(), shared_queue);
- canceled.splice(canceled.end(), exclusive_queue);
- }
- complete(std::move(canceled), boost::asio::error::operation_aborted);
-}
-
-template <typename Executor>
-void SharedMutex<Executor>::complete(RequestList&& requests,
- boost::system::error_code ec)
-{
- while (!requests.empty()) {
- auto& request = requests.front();
- requests.pop_front();
- try {
- request.complete(ec);
- } catch (...) {
- // clean up any remaining completions and rethrow
- requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
- throw;
- }
- }
-}
-
-} // namespace ceph::async
-
-#endif // CEPH_ASYNC_SHARED_MUTEX_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2018 Red Hat
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "common/async/shared_mutex.h"
-#include <optional>
-#include <gtest/gtest.h>
-
-namespace ceph::async {
-
-using executor_type = boost::asio::io_context::executor_type;
-using unique_lock = std::unique_lock<SharedMutex<executor_type>>;
-using shared_lock = std::shared_lock<SharedMutex<executor_type>>;
-
-// return a lambda that captures its error code and lock
-auto capture(std::optional<boost::system::error_code>& ec, unique_lock& lock)
-{
- return [&] (boost::system::error_code e, unique_lock l) {
- ec = e;
- lock = std::move(l);
- };
-}
-auto capture(std::optional<boost::system::error_code>& ec, shared_lock& lock)
-{
- return [&] (boost::system::error_code e, shared_lock l) {
- ec = e;
- lock = std::move(l);
- };
-}
-
-TEST(SharedMutex, async_exclusive)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2, ec3;
- unique_lock lock1, lock2, lock3;
-
- // request three exclusive locks
- mutex.async_lock(capture(ec1, lock1));
- mutex.async_lock(capture(ec2, lock2));
- mutex.async_lock(capture(ec3, lock3));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
-
- context.poll();
- EXPECT_FALSE(context.stopped()); // second lock still pending
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
-
- lock1.unlock();
-
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_FALSE(context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
- EXPECT_FALSE(ec3);
-
- lock2.unlock();
-
- EXPECT_FALSE(ec3);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec3);
- EXPECT_EQ(boost::system::errc::success, *ec3);
- ASSERT_TRUE(lock3);
-}
-
-TEST(SharedMutex, async_shared)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2;
- shared_lock lock1, lock2;
-
- // request two shared locks
- mutex.async_lock_shared(capture(ec1, lock1));
- mutex.async_lock_shared(capture(ec2, lock2));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
-}
-
-TEST(SharedMutex, async_exclusive_while_shared)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2;
- shared_lock lock1;
- unique_lock lock2;
-
- // request a shared and exclusive lock
- mutex.async_lock_shared(capture(ec1, lock1));
- mutex.async_lock(capture(ec2, lock2));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_FALSE(context.stopped()); // second lock still pending
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
-
- lock1.unlock();
-
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
-}
-
-TEST(SharedMutex, async_shared_while_exclusive)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2;
- unique_lock lock1;
- shared_lock lock2;
-
- // request an exclusive and shared lock
- mutex.async_lock(capture(ec1, lock1));
- mutex.async_lock_shared(capture(ec2, lock2));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_FALSE(context.stopped()); // second lock still pending
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
-
- lock1.unlock();
-
- EXPECT_FALSE(ec2);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
-}
-
-TEST(SharedMutex, async_prioritize_exclusive)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2, ec3;
- shared_lock lock1, lock3;
- unique_lock lock2;
-
- // acquire a shared lock, then request an exclusive and another shared lock
- mutex.async_lock_shared(capture(ec1, lock1));
- mutex.async_lock(capture(ec2, lock2));
- mutex.async_lock_shared(capture(ec3, lock3));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
-
- context.poll();
- EXPECT_FALSE(context.stopped());
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
- // exclusive waiter blocks the second shared lock
- EXPECT_FALSE(ec3);
-
- lock1.unlock();
-
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
-
- context.poll();
- EXPECT_FALSE(context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
- EXPECT_FALSE(ec3);
-}
-
-TEST(SharedMutex, async_cancel)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::optional<boost::system::error_code> ec1, ec2, ec3, ec4;
- unique_lock lock1, lock2;
- shared_lock lock3, lock4;
-
- // request 2 exclusive and shared locks
- mutex.async_lock(capture(ec1, lock1));
- mutex.async_lock(capture(ec2, lock2));
- mutex.async_lock_shared(capture(ec3, lock3));
- mutex.async_lock_shared(capture(ec4, lock4));
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
- EXPECT_FALSE(ec4);
-
- context.poll();
- EXPECT_FALSE(context.stopped());
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
- EXPECT_FALSE(ec4);
-
- mutex.cancel();
-
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
- EXPECT_FALSE(ec4);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
- EXPECT_FALSE(lock2);
- ASSERT_TRUE(ec3);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec3);
- EXPECT_FALSE(lock3);
- ASSERT_TRUE(ec4);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec4);
- EXPECT_FALSE(lock4);
-}
-
-TEST(SharedMutex, async_destruct)
-{
- boost::asio::io_context context;
-
- std::optional<boost::system::error_code> ec1, ec2, ec3, ec4;
- unique_lock lock1, lock2;
- shared_lock lock3, lock4;
-
- {
- SharedMutex mutex(context.get_executor());
-
- // request 2 exclusive and shared locks
- mutex.async_lock(capture(ec1, lock1));
- mutex.async_lock(capture(ec2, lock2));
- mutex.async_lock_shared(capture(ec3, lock3));
- mutex.async_lock_shared(capture(ec4, lock4));
- }
-
- EXPECT_FALSE(ec1); // no callbacks until poll()
- EXPECT_FALSE(ec2);
- EXPECT_FALSE(ec3);
- EXPECT_FALSE(ec4);
-
- context.poll();
- EXPECT_TRUE(context.stopped());
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
- EXPECT_FALSE(lock2);
- ASSERT_TRUE(ec3);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec3);
- EXPECT_FALSE(lock3);
- ASSERT_TRUE(ec4);
- EXPECT_EQ(boost::asio::error::operation_aborted, *ec4);
- EXPECT_FALSE(lock4);
-}
-
-// return a capture() lambda that's bound to the given executor
-template <typename Executor, typename ...Args>
-auto capture_ex(const Executor& ex, Args&& ...args)
-{
- return boost::asio::bind_executor(ex, capture(std::forward<Args>(args)...));
-}
-
-TEST(SharedMutex, cross_executor)
-{
- boost::asio::io_context mutex_context;
- SharedMutex mutex(mutex_context.get_executor());
-
- boost::asio::io_context callback_context;
- auto ex2 = callback_context.get_executor();
-
- std::optional<boost::system::error_code> ec1, ec2;
- unique_lock lock1, lock2;
-
- // request two exclusive locks
- mutex.async_lock(capture_ex(ex2, ec1, lock1));
- mutex.async_lock(capture_ex(ex2, ec2, lock2));
-
- EXPECT_FALSE(ec1);
- EXPECT_FALSE(ec2);
-
- mutex_context.poll();
- EXPECT_FALSE(mutex_context.stopped()); // maintains work on both executors
-
- EXPECT_FALSE(ec1); // no callbacks until poll() on callback_context
- EXPECT_FALSE(ec2);
-
- callback_context.poll();
- EXPECT_FALSE(callback_context.stopped()); // second lock still pending
-
- ASSERT_TRUE(ec1);
- EXPECT_EQ(boost::system::errc::success, *ec1);
- ASSERT_TRUE(lock1);
- EXPECT_FALSE(ec2);
-
- lock1.unlock();
-
- mutex_context.poll();
- EXPECT_TRUE(mutex_context.stopped());
-
- EXPECT_FALSE(ec2);
-
- callback_context.poll();
- EXPECT_TRUE(callback_context.stopped());
-
- ASSERT_TRUE(ec2);
- EXPECT_EQ(boost::system::errc::success, *ec2);
- ASSERT_TRUE(lock2);
-}
-
-TEST(SharedMutex, try_exclusive)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
- {
- std::lock_guard lock{mutex};
- ASSERT_FALSE(mutex.try_lock()); // fail during exclusive
- }
- {
- std::shared_lock lock{mutex};
- ASSERT_FALSE(mutex.try_lock()); // fail during shared
- }
- ASSERT_TRUE(mutex.try_lock());
- mutex.unlock();
-}
-
-TEST(SharedMutex, try_shared)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
- {
- std::lock_guard lock{mutex};
- ASSERT_FALSE(mutex.try_lock_shared()); // fail during exclusive
- }
- {
- std::shared_lock lock{mutex};
- ASSERT_TRUE(mutex.try_lock_shared()); // succeed during shared
- mutex.unlock_shared();
- }
- ASSERT_TRUE(mutex.try_lock_shared());
- mutex.unlock_shared();
-}
-
-TEST(SharedMutex, cancel)
-{
- boost::asio::io_context context;
- SharedMutex mutex(context.get_executor());
-
- std::lock_guard l{mutex}; // exclusive lock blocks others
-
- // make synchronous lock calls in other threads
- auto f1 = std::async(std::launch::async, [&] { mutex.lock(); });
- auto f2 = std::async(std::launch::async, [&] { mutex.lock_shared(); });
-
- // this will race with spawned threads. just keep canceling until the
- // futures are ready
- const auto t = std::chrono::milliseconds(1);
- do { mutex.cancel(); } while (f1.wait_for(t) != std::future_status::ready);
- do { mutex.cancel(); } while (f2.wait_for(t) != std::future_status::ready);
-
- EXPECT_THROW(f1.get(), boost::system::system_error);
- EXPECT_THROW(f2.get(), boost::system::system_error);
-}
-
-} // namespace ceph::async