]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/sync: add with_lease() with polymorphic LockClient
authorCasey Bodley <cbodley@redhat.com>
Wed, 25 Jan 2023 18:22:30 +0000 (13:22 -0500)
committerAdam Emerson <aemerson@redhat.com>
Thu, 14 Sep 2023 21:48:00 +0000 (17:48 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/sync/common.h [new file with mode: 0644]
src/rgw/sync/detail/lease_state.h [new file with mode: 0644]
src/rgw/sync/lease.h [new file with mode: 0644]
src/test/rgw/CMakeLists.txt
src/test/rgw/test_rgw_sync_lease.cc [new file with mode: 0644]

diff --git a/src/rgw/sync/common.h b/src/rgw/sync/common.h
new file mode 100644 (file)
index 0000000..6b0cc7f
--- /dev/null
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <boost/asio/awaitable.hpp>
+#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/io_context.hpp>
+#include <boost/asio/strand.hpp>
+#include <boost/system/error_code.hpp>
+
+namespace rgw::sync {
+
+using error_code = boost::system::error_code;
+
+namespace asio = boost::asio;
+
+using default_executor = asio::strand<asio::io_context::executor_type>;
+
+template <typename T>
+using awaitable = asio::awaitable<T, default_executor>;
+
+using use_awaitable_t = asio::use_awaitable_t<default_executor>;
+static constexpr use_awaitable_t use_awaitable{};
+
+} // namespace rgw::sync
diff --git a/src/rgw/sync/detail/lease_state.h b/src/rgw/sync/detail/lease_state.h
new file mode 100644 (file)
index 0000000..479466b
--- /dev/null
@@ -0,0 +1,175 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <optional>
+#include <utility>
+#include <boost/asio/async_result.hpp>
+#include <boost/asio/basic_waitable_timer.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
+#include <boost/asio/cancellation_signal.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/smart_ptr/intrusive_ref_counter.hpp>
+#include "common/async/co_waiter.h"
+#include "common/async/service.h"
+#include "common/ceph_time.h"
+#include "sync/common.h"
+
+namespace rgw::sync::detail {
+
+using lease_clock = ceph::coarse_mono_clock;
+using lease_timer = asio::basic_waitable_timer<lease_clock,
+      asio::wait_traits<lease_clock>, default_executor>;
+
+// base class for the lease completion state. this contains everything that
+// doesn't depend on the coroutine's return type
+class lease_state : public ceph::async::service_list_base_hook {
+  ceph::async::service<lease_state>* svc = nullptr;
+  lease_timer timer;
+  asio::cancellation_signal signal;
+  std::exception_ptr eptr;
+  ceph::async::co_waiter<void, default_executor> waiter;
+
+ public:
+  lease_state(default_executor ex) : timer(ex) {}
+  ~lease_state()
+  {
+    if (svc) {
+      svc->remove(*this);
+    }
+  }
+
+  lease_timer& get_timer() { return timer; }
+
+  asio::cancellation_slot get_cancellation_slot() { return signal.slot(); }
+
+  awaitable<void> wait()
+  {
+    if (!svc) {
+      // register for service_shutdown() notifications
+      svc = &asio::use_service<ceph::async::service<lease_state>>(
+          asio::query(co_await asio::this_coro::executor,
+                      asio::execution::context));
+      svc->add(*this);
+    }
+    co_await waiter.get();
+  }
+
+  void complete()
+  {
+    if (waiter.waiting()) {
+      waiter.complete(nullptr);
+    } else {
+      timer.cancel(); // wake the renewal loop
+    }
+  }
+
+  bool aborted() const { return !!eptr; }
+
+  void abort(std::exception_ptr e)
+  {
+    if (!eptr) { // only the first exception is reported
+      eptr = e;
+      cancel();
+    }
+  }
+
+  void rethrow()
+  {
+    if (eptr) {
+      std::rethrow_exception(eptr);
+    }
+  }
+
+  void cancel()
+  {
+    signal.emit(asio::cancellation_type::terminal);
+    timer.cancel();
+  }
+
+  void service_shutdown()
+  {
+    waiter.shutdown();
+  }
+};
+
+// capture and return the arguments to cr's completion handler
+template <typename T>
+class lease_completion_state : public lease_state,
+    public boost::intrusive_ref_counter<lease_completion_state<T>,
+        boost::thread_unsafe_counter>
+{
+  using result_type = std::pair<std::exception_ptr, T>;
+  std::optional<result_type> result;
+ public:
+  lease_completion_state(default_executor ex) : lease_state(ex) {}
+
+  bool completed() const { return result.has_value(); }
+
+  auto completion_handler()
+  {
+    return asio::bind_cancellation_slot(get_cancellation_slot(),
+        [self = boost::intrusive_ptr{this}] (std::exception_ptr eptr, T val) {
+          self->result.emplace(eptr, std::move(val));
+          self->complete();
+        });
+  }
+
+  T get() // precondition: completed()
+  {
+    rethrow(); // rethrow exceptions from renewal
+    if (auto eptr = std::get<0>(*result); eptr) {
+      std::rethrow_exception(eptr);
+    }
+    return std::get<1>(std::move(*result));
+  }
+};
+
+// specialization for awaitable<void>
+template<>
+class lease_completion_state<void> : public lease_state,
+    public boost::intrusive_ref_counter<lease_completion_state<void>,
+        boost::thread_unsafe_counter>
+{
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+ public:
+  lease_completion_state(default_executor ex) : lease_state(ex) {}
+
+  bool completed() const { return result.has_value(); }
+
+  auto completion_handler()
+  {
+    return asio::bind_cancellation_slot(get_cancellation_slot(),
+        [self = boost::intrusive_ptr{this}] (std::exception_ptr eptr) {
+          self->result = eptr;
+          self->complete();
+        });
+  }
+
+  void get() // precondition: completed()
+  {
+    rethrow(); // rethrow exceptions from renewal
+    if (*result) {
+      std::rethrow_exception(*result);
+    }
+  }
+};
+
+template <typename T>
+auto make_lease_completion_state(default_executor ex)
+{
+  return boost::intrusive_ptr{new lease_completion_state<T>(ex)};
+}
+
+} // namespace rgw::sync::detail
diff --git a/src/rgw/sync/lease.h b/src/rgw/sync/lease.h
new file mode 100644 (file)
index 0000000..1457d0c
--- /dev/null
@@ -0,0 +1,125 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "include/scope_guard.h"
+#include "common/ceph_time.h"
+#include "detail/lease_state.h"
+#include "common.h"
+
+namespace rgw::sync {
+
+/// \brief Client interface for a specific timed distributed exclusive lock.
+class LockClient {
+ public:
+  virtual ~LockClient() {}
+
+  /// Acquire a timed lock for the given duration, or throw on error.
+  virtual awaitable<void> acquire(ceph::timespan duration) = 0;
+  /// Renew an acquired lock for the given duration, or throw on error.
+  virtual awaitable<void> renew(ceph::timespan duration) = 0;
+  /// Release an acquired lock, or throw on error.
+  virtual awaitable<void> release() = 0;
+};
+
+
+/// \brief Call a coroutine under the protection of a continuous lease.
+///
+/// Acquires exclusive access to a timed lock, then spawns the given coroutine
+/// \ref cr. The lock is renewed at intervals of \ref duration / 2. The
+/// coroutine is canceled if the lock is lost before its completion.
+///
+/// Exceptions thrown by release() are ignored, but exceptions from acquire()
+/// and renew() propagate back to the caller. If renew() is delayed long
+/// enough for the lock to expire, a boost::system::system_error exception is
+/// thrown with an error code matching boost::system::errc::timed_out.
+///
+/// Otherwise, the result of \ref cr is returned to the caller, whether by
+/// exception or return value.
+///
+/// \relates LockClient
+///
+/// \param lock A client that can send lock requests
+/// \param duration Duration of the lock
+/// \param cr The coroutine to call under lease
+///
+/// \tparam T The return type of the coroutine \ref cr
+template <typename T>
+auto with_lease(LockClient& lock,
+                ceph::timespan duration,
+                awaitable<T> cr)
+    -> awaitable<T>
+{
+  auto ex = co_await asio::this_coro::executor;
+
+  // acquire the lock. exceptions propagate directly to the caller
+  co_await lock.acquire(duration);
+  auto expires_at = detail::lease_clock::now() + duration;
+
+  // allocate the lease state with scoped cancellation so that with_lease()'s
+  // cancellation triggers cancellation of the spawned coroutine
+  auto state = detail::make_lease_completion_state<T>(ex);
+  const auto state_guard = make_scope_guard([&state] { state->cancel(); });
+
+  // spawn the coroutine with a waitable/cancelable completion handler
+  asio::co_spawn(ex, std::move(cr), state->completion_handler());
+
+  // lock renewal loop
+  auto& timer = state->get_timer();
+  const ceph::timespan interval = duration / 2;
+
+  while (!state->aborted() && !state->completed()) {
+    // sleep until the next lock interval
+    timer.expires_after(interval);
+    try {
+      co_await timer.async_wait(use_awaitable);
+    } catch (const std::exception&) {
+      break; // timer canceled by cr's completion, or caller canceled
+    }
+
+    // arm a timeout for the renew request
+    timer.expires_at(expires_at);
+    timer.async_wait([state] (error_code ec) {
+          if (!ec) {
+            state->abort(std::make_exception_ptr(
+                boost::system::system_error(
+                    ETIMEDOUT, boost::system::system_category())));
+          }
+        });
+
+    try {
+      co_await lock.renew(duration);
+      expires_at = detail::lease_clock::now() + duration;
+    } catch (const std::exception&) {
+      state->abort(std::current_exception());
+      expires_at = detail::lease_clock::zero(); // don't release below
+      break;
+    }
+  }
+  timer.cancel();
+
+  // if cr was canceled, await its completion before releasing the lock
+  if (!state->completed()) {
+    co_await state->wait();
+  }
+
+  // release the lock if it hasn't expired
+  if (detail::lease_clock::now() < expires_at) try {
+    co_await lock.release();
+  } catch (const std::exception&) {} // ignore errors
+
+  // return the spawned coroutine's result
+  co_return state->get();
+}
+
+} // namespace rgw::sync
index 0f99597c21e230d616edc06d1cb63c3feb068e77..4f29c9dcd92f5c12f98a745101448bf2f6069efe 100644 (file)
@@ -185,10 +185,17 @@ add_executable(unittest_rgw_putobj test_rgw_putobj.cc)
 add_ceph_unittest(unittest_rgw_putobj)
 target_link_libraries(unittest_rgw_putobj ${rgw_libs} ${UNITTEST_LIBS})
 
+
 add_executable(unittest_rgw_throttle test_rgw_throttle.cc)
 add_ceph_unittest(unittest_rgw_throttle)
 target_link_libraries(unittest_rgw_throttle ${rgw_libs} ${UNITTEST_LIBS})
 
+add_executable(unittest_rgw_sync_lease test_rgw_sync_lease.cc)
+target_include_directories(unittest_rgw_sync_lease
+  PRIVATE "${CMAKE_SOURCE_DIR}/src/rgw")
+add_ceph_unittest(unittest_rgw_sync_lease)
+target_link_libraries(unittest_rgw_sync_lease ceph-common ${UNITTEST_LIBS})
+
 add_executable(unittest_rgw_iam_policy test_rgw_iam_policy.cc)
 add_ceph_unittest(unittest_rgw_iam_policy)
 target_link_libraries(unittest_rgw_iam_policy
diff --git a/src/test/rgw/test_rgw_sync_lease.cc b/src/test/rgw/test_rgw_sync_lease.cc
new file mode 100644 (file)
index 0000000..d0bd15f
--- /dev/null
@@ -0,0 +1,704 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+//#define BOOST_ASIO_ENABLE_HANDLER_TRACKING
+#include "sync/lease.h"
+
+#include <utility>
+#include <vector>
+#include <gtest/gtest.h>
+#include "common/async/co_waiter.h"
+
+namespace rgw::sync {
+
+using namespace std::chrono_literals;
+
+// injects delays and exceptions into MockClient
+using MockWaiter = ceph::async::co_waiter<void, default_executor>;
+
+enum class event { acquire, renew, release };
+
+// a LockClient that logs events and injects delays and exceptions
+class MockClient : public LockClient {
+  MockWaiter waiter;
+ public:
+  std::vector<event> events; // tracks the sequence of LockClient calls
+
+  awaitable<void> acquire(ceph::timespan) override {
+    events.push_back(event::acquire);
+    co_await waiter.get();
+  }
+  awaitable<void> renew(ceph::timespan) override {
+    events.push_back(event::renew);
+    co_await waiter.get();
+  }
+  awaitable<void> release() override {
+    events.push_back(event::release);
+    co_await waiter.get();
+  }
+
+  void complete(std::exception_ptr eptr) {
+    waiter.complete(eptr);
+  }
+};
+
+template <typename T>
+auto capture(std::optional<T>& opt)
+{
+  return [&opt] (T value) {
+    opt = std::move(value);
+  };
+}
+
+template <typename T>
+auto capture(asio::cancellation_signal& signal, std::optional<T>& opt)
+{
+  return asio::bind_cancellation_slot(signal.slot(), capture(opt));
+}
+
+template <typename ...Args>
+auto capture(std::optional<std::tuple<Args...>>& opt)
+{
+  return [&opt] (Args ...args) {
+    opt = std::forward_as_tuple(std::forward<Args>(args)...);
+  };
+}
+
+
+TEST(with_lease, return_void)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  locker.complete(nullptr); // unblock release
+
+  ctx.poll();
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result);
+}
+
+TEST(with_lease, return_value)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  using ptr = std::unique_ptr<int>; // test a move-only return type
+  auto cr = [] () -> awaitable<ptr> { co_return std::make_unique<int>(42); };
+
+  using result_type = std::tuple<std::exception_ptr, ptr>;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  EXPECT_FALSE(result);
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  locker.complete(nullptr); // unblock release
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(std::get<0>(*result));
+  ASSERT_TRUE(std::get<1>(*result));
+  EXPECT_EQ(42, *std::get<1>(*result));
+}
+
+TEST(with_lease, spawned_exception)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> {
+    throw std::runtime_error{"oops"};
+    co_return;
+  };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  locker.complete(nullptr); // unblock release
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(with_lease, acquire_exception)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(with_lease, acquire_shutdown)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+  // shut down before acquire completes
+}
+
+TEST(with_lease, acquire_cancel)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  asio::cancellation_signal signal;
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(signal, result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  // cancel before acquire completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result); // throws on cancellation
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, acquired_shutdown)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // start cr and renewal timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  // shut down before renewal timer
+}
+
+TEST(with_lease, acquired_cancel)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  asio::cancellation_signal signal;
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)),
+                 capture(signal, result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // wait for acquire to finish
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before renewal timer
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result); // throws on cancellation
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, renew_exception)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+
+  // inject an exception on renew
+  locker.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  EXPECT_THROW(std::rethrow_exception(*result), std::runtime_error);
+}
+
+TEST(with_lease, renew_after_timeout)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+
+  ctx.run_one(); // wait for renew timeout
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  locker.complete(nullptr); // unblock renew
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(3, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  locker.complete(nullptr); // unblock release
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), boost::system::errc::timed_out);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, renew_exception_after_timeout)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+
+  ctx.run_one(); // wait for renew timeout
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // inject an exception on renew
+  locker.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), boost::system::errc::timed_out);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, renew_cancel_after_timeout)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::cancellation_signal signal;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)),
+                 capture(signal, result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+
+  ctx.run_one(); // wait for renew timeout
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  // cancel before renew completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_EQ(2, locker.events.size()); // no release
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result);
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), boost::system::errc::timed_out);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, renew_shutdown)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+  // shut down before renew completes
+}
+
+TEST(with_lease, renew_cancel)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto waiter = MockWaiter{};
+  auto cr = waiter.get(); // cr is a wait that never completes
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::cancellation_signal signal;
+  asio::co_spawn(ex, with_lease(locker, 10ms, std::move(cr)),
+                 capture(signal, result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until renew timer blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+
+  ctx.run_one(); // wait ~5ms for renew timer
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::renew, locker.events.back());
+
+  // cancel before renew completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_TRUE(*result); // throws on cancellation
+  try {
+    std::rethrow_exception(*result);
+  } catch (const boost::system::system_error& e) {
+    EXPECT_EQ(e.code(), asio::error::operation_aborted);
+  } catch (const std::exception& e) {
+    EXPECT_THROW(throw, boost::system::system_error);
+  }
+}
+
+TEST(with_lease, release_exception)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  // inject an exception on release
+  locker.complete(std::make_exception_ptr(std::runtime_error{"oops"}));
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  EXPECT_FALSE(*result); // release exceptions are ignored
+}
+
+TEST(with_lease, release_shutdown)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+  // shut down before release completes
+}
+
+TEST(with_lease, release_cancel)
+{
+  boost::asio::io_context ctx;
+  auto ex = default_executor{ctx.get_executor()};
+  auto locker = MockClient{};
+
+  auto cr = [] () -> awaitable<void> { co_return; };
+
+  using result_type = std::exception_ptr;
+  std::optional<result_type> result;
+  asio::cancellation_signal signal;
+  asio::co_spawn(ex, with_lease(locker, 30s, cr()), capture(signal, result));
+
+  ctx.poll(); // run until acquire blocks
+  ASSERT_FALSE(ctx.stopped());
+  ASSERT_EQ(1, locker.events.size());
+  EXPECT_EQ(event::acquire, locker.events.back());
+
+  locker.complete(nullptr); // unblock acquire
+
+  ctx.poll(); // run until release blocks
+  ASSERT_FALSE(ctx.stopped());
+  EXPECT_FALSE(result);
+  ASSERT_EQ(2, locker.events.size());
+  EXPECT_EQ(event::release, locker.events.back());
+
+  // cancel before release completes
+  signal.emit(asio::cancellation_type::terminal);
+
+  ctx.poll(); // run to completion
+  ASSERT_TRUE(ctx.stopped());
+  ASSERT_TRUE(result);
+  ASSERT_FALSE(*result); // exception is ignored
+}
+
+} // namespace rgw::sync