From: Casey Bodley Date: Wed, 25 Jan 2023 18:22:30 +0000 (-0500) Subject: rgw/sync: add with_lease() with polymorphic LockClient X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=e1b9b6c7473577fe3af9b848552e0b9de5ae8125;p=ceph.git rgw/sync: add with_lease() with polymorphic LockClient Signed-off-by: Casey Bodley --- diff --git a/src/rgw/sync/common.h b/src/rgw/sync/common.h new file mode 100644 index 0000000000000..6b0cc7f302cc8 --- /dev/null +++ b/src/rgw/sync/common.h @@ -0,0 +1,35 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace rgw::sync { + +using error_code = boost::system::error_code; + +namespace asio = boost::asio; + +using default_executor = asio::strand; + +template +using awaitable = asio::awaitable; + +using use_awaitable_t = asio::use_awaitable_t; +static constexpr use_awaitable_t use_awaitable{}; + +} // namespace rgw::sync diff --git a/src/rgw/sync/detail/lease_state.h b/src/rgw/sync/detail/lease_state.h new file mode 100644 index 0000000000000..479466b545397 --- /dev/null +++ b/src/rgw/sync/detail/lease_state.h @@ -0,0 +1,175 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/async/co_waiter.h" +#include "common/async/service.h" +#include "common/ceph_time.h" +#include "sync/common.h" + +namespace rgw::sync::detail { + +using lease_clock = ceph::coarse_mono_clock; +using lease_timer = asio::basic_waitable_timer, default_executor>; + +// base class for the lease completion state. this contains everything that +// doesn't depend on the coroutine's return type +class lease_state : public ceph::async::service_list_base_hook { + ceph::async::service* svc = nullptr; + lease_timer timer; + asio::cancellation_signal signal; + std::exception_ptr eptr; + ceph::async::co_waiter waiter; + + public: + lease_state(default_executor ex) : timer(ex) {} + ~lease_state() + { + if (svc) { + svc->remove(*this); + } + } + + lease_timer& get_timer() { return timer; } + + asio::cancellation_slot get_cancellation_slot() { return signal.slot(); } + + awaitable wait() + { + if (!svc) { + // register for service_shutdown() notifications + svc = &asio::use_service>( + asio::query(co_await asio::this_coro::executor, + asio::execution::context)); + svc->add(*this); + } + co_await waiter.get(); + } + + void complete() + { + if (waiter.waiting()) { + waiter.complete(nullptr); + } else { + timer.cancel(); // wake the renewal loop + } + } + + bool aborted() const { return !!eptr; } + + void abort(std::exception_ptr e) + { + if (!eptr) { // only the first exception is reported + eptr = e; + cancel(); + } + } + + void rethrow() + { + if (eptr) { + std::rethrow_exception(eptr); + } + } + + void cancel() + { + signal.emit(asio::cancellation_type::terminal); + timer.cancel(); + } + + void service_shutdown() + { + waiter.shutdown(); + } +}; + +// capture and return the arguments to cr's completion handler +template +class lease_completion_state : public lease_state, + public boost::intrusive_ref_counter, + boost::thread_unsafe_counter> +{ + using result_type = std::pair; + std::optional result; + public: + lease_completion_state(default_executor ex) : lease_state(ex) {} + + bool completed() const { return result.has_value(); } + + auto completion_handler() + { + return asio::bind_cancellation_slot(get_cancellation_slot(), + [self = boost::intrusive_ptr{this}] (std::exception_ptr eptr, T val) { + self->result.emplace(eptr, std::move(val)); + self->complete(); + }); + } + + T get() // precondition: completed() + { + rethrow(); // rethrow exceptions from renewal + if (auto eptr = std::get<0>(*result); eptr) { + std::rethrow_exception(eptr); + } + return std::get<1>(std::move(*result)); + } +}; + +// specialization for awaitable +template<> +class lease_completion_state : public lease_state, + public boost::intrusive_ref_counter, + boost::thread_unsafe_counter> +{ + using result_type = std::exception_ptr; + std::optional result; + public: + lease_completion_state(default_executor ex) : lease_state(ex) {} + + bool completed() const { return result.has_value(); } + + auto completion_handler() + { + return asio::bind_cancellation_slot(get_cancellation_slot(), + [self = boost::intrusive_ptr{this}] (std::exception_ptr eptr) { + self->result = eptr; + self->complete(); + }); + } + + void get() // precondition: completed() + { + rethrow(); // rethrow exceptions from renewal + if (*result) { + std::rethrow_exception(*result); + } + } +}; + +template +auto make_lease_completion_state(default_executor ex) +{ + return boost::intrusive_ptr{new lease_completion_state(ex)}; +} + +} // namespace rgw::sync::detail diff --git a/src/rgw/sync/lease.h b/src/rgw/sync/lease.h new file mode 100644 index 0000000000000..1457d0c8d1069 --- /dev/null +++ b/src/rgw/sync/lease.h @@ -0,0 +1,125 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include "include/scope_guard.h" +#include "common/ceph_time.h" +#include "detail/lease_state.h" +#include "common.h" + +namespace rgw::sync { + +/// \brief Client interface for a specific timed distributed exclusive lock. +class LockClient { + public: + virtual ~LockClient() {} + + /// Acquire a timed lock for the given duration, or throw on error. + virtual awaitable acquire(ceph::timespan duration) = 0; + /// Renew an acquired lock for the given duration, or throw on error. + virtual awaitable renew(ceph::timespan duration) = 0; + /// Release an acquired lock, or throw on error. + virtual awaitable release() = 0; +}; + + +/// \brief Call a coroutine under the protection of a continuous lease. +/// +/// Acquires exclusive access to a timed lock, then spawns the given coroutine +/// \ref cr. The lock is renewed at intervals of \ref duration / 2. The +/// coroutine is canceled if the lock is lost before its completion. +/// +/// Exceptions thrown by release() are ignored, but exceptions from acquire() +/// and renew() propagate back to the caller. If renew() is delayed long +/// enough for the lock to expire, a boost::system::system_error exception is +/// thrown with an error code matching boost::system::errc::timed_out. +/// +/// Otherwise, the result of \ref cr is returned to the caller, whether by +/// exception or return value. +/// +/// \relates LockClient +/// +/// \param lock A client that can send lock requests +/// \param duration Duration of the lock +/// \param cr The coroutine to call under lease +/// +/// \tparam T The return type of the coroutine \ref cr +template +auto with_lease(LockClient& lock, + ceph::timespan duration, + awaitable cr) + -> awaitable +{ + auto ex = co_await asio::this_coro::executor; + + // acquire the lock. exceptions propagate directly to the caller + co_await lock.acquire(duration); + auto expires_at = detail::lease_clock::now() + duration; + + // allocate the lease state with scoped cancellation so that with_lease()'s + // cancellation triggers cancellation of the spawned coroutine + auto state = detail::make_lease_completion_state(ex); + const auto state_guard = make_scope_guard([&state] { state->cancel(); }); + + // spawn the coroutine with a waitable/cancelable completion handler + asio::co_spawn(ex, std::move(cr), state->completion_handler()); + + // lock renewal loop + auto& timer = state->get_timer(); + const ceph::timespan interval = duration / 2; + + while (!state->aborted() && !state->completed()) { + // sleep until the next lock interval + timer.expires_after(interval); + try { + co_await timer.async_wait(use_awaitable); + } catch (const std::exception&) { + break; // timer canceled by cr's completion, or caller canceled + } + + // arm a timeout for the renew request + timer.expires_at(expires_at); + timer.async_wait([state] (error_code ec) { + if (!ec) { + state->abort(std::make_exception_ptr( + boost::system::system_error( + ETIMEDOUT, boost::system::system_category()))); + } + }); + + try { + co_await lock.renew(duration); + expires_at = detail::lease_clock::now() + duration; + } catch (const std::exception&) { + state->abort(std::current_exception()); + expires_at = detail::lease_clock::zero(); // don't release below + break; + } + } + timer.cancel(); + + // if cr was canceled, await its completion before releasing the lock + if (!state->completed()) { + co_await state->wait(); + } + + // release the lock if it hasn't expired + if (detail::lease_clock::now() < expires_at) try { + co_await lock.release(); + } catch (const std::exception&) {} // ignore errors + + // return the spawned coroutine's result + co_return state->get(); +} + +} // namespace rgw::sync diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 0f99597c21e23..4f29c9dcd92f5 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -185,10 +185,17 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc) add_ceph_unittest(unittest_rgw_putobj) target_link_libraries(unittest_rgw_putobj ${rgw_libs} ${UNITTEST_LIBS}) + add_executable(unittest_rgw_throttle test_rgw_throttle.cc) add_ceph_unittest(unittest_rgw_throttle) target_link_libraries(unittest_rgw_throttle ${rgw_libs} ${UNITTEST_LIBS}) +add_executable(unittest_rgw_sync_lease test_rgw_sync_lease.cc) +target_include_directories(unittest_rgw_sync_lease + PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw") +add_ceph_unittest(unittest_rgw_sync_lease) +target_link_libraries(unittest_rgw_sync_lease ceph-common ${UNITTEST_LIBS}) + add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc) add_ceph_unittest(unittest_rgw_iam_policy) target_link_libraries(unittest_rgw_iam_policy diff --git a/src/test/rgw/test_rgw_sync_lease.cc b/src/test/rgw/test_rgw_sync_lease.cc new file mode 100644 index 0000000000000..d0bd15fd24c86 --- /dev/null +++ b/src/test/rgw/test_rgw_sync_lease.cc @@ -0,0 +1,704 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING +#include "sync/lease.h" + +#include +#include +#include +#include "common/async/co_waiter.h" + +namespace rgw::sync { + +using namespace std::chrono_literals; + +// injects delays and exceptions into MockClient +using MockWaiter = ceph::async::co_waiter; + +enum class event { acquire, renew, release }; + +// a LockClient that logs events and injects delays and exceptions +class MockClient : public LockClient { + MockWaiter waiter; + public: + std::vector events; // tracks the sequence of LockClient calls + + awaitable acquire(ceph::timespan) override { + events.push_back(event::acquire); + co_await waiter.get(); + } + awaitable renew(ceph::timespan) override { + events.push_back(event::renew); + co_await waiter.get(); + } + awaitable release() override { + events.push_back(event::release); + co_await waiter.get(); + } + + void complete(std::exception_ptr eptr) { + waiter.complete(eptr); + } +}; + +template +auto capture(std::optional& opt) +{ + return [&opt] (T value) { + opt = std::move(value); + }; +} + +template +auto capture(asio::cancellation_signal& signal, std::optional& opt) +{ + return asio::bind_cancellation_slot(signal.slot(), capture(opt)); +} + +template +auto capture(std::optional>& opt) +{ + return [&opt] (Args ...args) { + opt = std::forward_as_tuple(std::forward(args)...); + }; +} + + +TEST(with_lease, return_void) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + locker.complete(nullptr); // unblock release + + ctx.poll(); + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); +} + +TEST(with_lease, return_value) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + using ptr = std::unique_ptr; // test a move-only return type + auto cr = [] () -> awaitable { co_return std::make_unique(42); }; + + using result_type = std::tuple; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + EXPECT_FALSE(result); + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + locker.complete(nullptr); // unblock release + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(std::get<0>(*result)); + ASSERT_TRUE(std::get<1>(*result)); + EXPECT_EQ(42, *std::get<1>(*result)); +} + +TEST(with_lease, spawned_exception) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { + throw std::runtime_error{"oops"}; + co_return; + }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + locker.complete(nullptr); // unblock release + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(with_lease, acquire_exception) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(with_lease, acquire_shutdown) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + // shut down before acquire completes +} + +TEST(with_lease, acquire_cancel) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + asio::cancellation_signal signal; + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(signal, result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + // cancel before acquire completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); // throws on cancellation + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, acquired_shutdown) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // start cr and renewal timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + // shut down before renewal timer +} + +TEST(with_lease, acquired_cancel) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + asio::cancellation_signal signal; + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), + capture(signal, result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // wait for acquire to finish + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before renewal timer + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); // throws on cancellation + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, renew_exception) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + + // inject an exception on renew + locker.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error); +} + +TEST(with_lease, renew_after_timeout) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + + ctx.run_one(); // wait for renew timeout + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + locker.complete(nullptr); // unblock renew + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(3, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + locker.complete(nullptr); // unblock release + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), boost::system::errc::timed_out); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, renew_exception_after_timeout) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + + ctx.run_one(); // wait for renew timeout + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // inject an exception on renew + locker.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), boost::system::errc::timed_out); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, renew_cancel_after_timeout) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::cancellation_signal signal; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), + capture(signal, result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + + ctx.run_one(); // wait for renew timeout + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + // cancel before renew completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_EQ(2, locker.events.size()); // no release + ASSERT_TRUE(result); + ASSERT_TRUE(*result); + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), boost::system::errc::timed_out); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, renew_shutdown) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + // shut down before renew completes +} + +TEST(with_lease, renew_cancel) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto waiter = MockWaiter{}; + auto cr = waiter.get(); // cr is a wait that never completes + + using result_type = std::exception_ptr; + std::optional result; + asio::cancellation_signal signal; + asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), + capture(signal, result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until renew timer blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + + ctx.run_one(); // wait ~5ms for renew timer + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::renew, locker.events.back()); + + // cancel before renew completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_TRUE(*result); // throws on cancellation + try { + std::rethrow_exception(*result); + } catch (const boost::system::system_error& e) { + EXPECT_EQ(e.code(), asio::error::operation_aborted); + } catch (const std::exception& e) { + EXPECT_THROW(throw, boost::system::system_error); + } +} + +TEST(with_lease, release_exception) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + // inject an exception on release + locker.complete(std::make_exception_ptr(std::runtime_error{"oops"})); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + EXPECT_FALSE(*result); // release exceptions are ignored +} + +TEST(with_lease, release_shutdown) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + // shut down before release completes +} + +TEST(with_lease, release_cancel) +{ + boost::asio::io_context ctx; + auto ex = default_executor{ctx.get_executor()}; + auto locker = MockClient{}; + + auto cr = [] () -> awaitable { co_return; }; + + using result_type = std::exception_ptr; + std::optional result; + asio::cancellation_signal signal; + asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(signal, result)); + + ctx.poll(); // run until acquire blocks + ASSERT_FALSE(ctx.stopped()); + ASSERT_EQ(1, locker.events.size()); + EXPECT_EQ(event::acquire, locker.events.back()); + + locker.complete(nullptr); // unblock acquire + + ctx.poll(); // run until release blocks + ASSERT_FALSE(ctx.stopped()); + EXPECT_FALSE(result); + ASSERT_EQ(2, locker.events.size()); + EXPECT_EQ(event::release, locker.events.back()); + + // cancel before release completes + signal.emit(asio::cancellation_type::terminal); + + ctx.poll(); // run to completion + ASSERT_TRUE(ctx.stopped()); + ASSERT_TRUE(result); + ASSERT_FALSE(*result); // exception is ignored +} + +} // namespace rgw::sync