]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/rados: MPRadosSerializer spawns lock renewal coroutine
authorCasey Bodley <cbodley@redhat.com>
Wed, 11 Mar 2026 21:45:55 +0000 (17:45 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 17 Mar 2026 22:52:09 +0000 (18:52 -0400)
CompleteMultipartUpload depends on this lock to ensure consistency of
uploads and protect against data loss, so we should try very hard to
hold this lock as long as it takes to complete successfully

MPRadosSerializer accomplishes this by spawning a background lock
renewal coroutine. this coroutine is started during a successful call to
try_lock(), and stopped before unlock() releases the lock

Fixes: https://tracker.ceph.com/issues/75375
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h

index 1a54761faa2851b0c29f7344d42b9d32285218fa..2fa4095750d8cab8e28650cb6777b0b7cd344bf0 100644 (file)
@@ -3195,7 +3195,7 @@ int RadosObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_y
 
 std::unique_ptr<MPSerializer> RadosObject::get_serializer(const DoutPrefixProvider *dpp, optional_yield y, const std::string& lock_name)
 {
-  return std::make_unique<MPRadosSerializer>(dpp, store, this, lock_name);
+  return std::make_unique<MPRadosSerializer>(dpp, y, store, this, lock_name);
 }
 
 int RadosObject::transition(Bucket* bucket,
@@ -4632,8 +4632,15 @@ std::unique_ptr<Writer> RadosMultipartUpload::get_writer(
                                 ptail_placement_rule, part_num, part_num_str, obj->get_trace());
 }
 
-MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
-  lock(lock_name)
+MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y,
+                                     RadosStore* store, RadosObject* obj,
+                                     const std::string& lock_name)
+  : dpp(dpp),
+    y(y),
+    lock_state(lock_name),
+    ex(boost::asio::make_strand(store->get_io_context())),
+    timer(ex),
+    cond(ex)
 {
   rgw_pool meta_pool;
   rgw_raw_obj raw_obj;
@@ -4645,24 +4652,123 @@ MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore*
   store->getRados()->open_pool_ctx(dpp, meta_pool, ioctx, true, true);
 }
 
+MPRadosSerializer::~MPRadosSerializer()
+{
+  // stop the renewal coroutine if unlock() wasn't called
+  stop_renewal();
+}
+
+static void renewal(const DoutPrefixProvider* dpp,
+                    rgw::sal::MPSerializer& serializer,
+                    librados::IoCtx& ioctx,
+                    const std::string& oid,
+                    rados::cls::lock::Lock lock,
+                    auto& timer,
+                    ceph::timespan dur,
+                    boost::asio::yield_context yield)
+{
+  const ceph::timespan renew_every = dur / 2;
+  lock.set_duration(dur);
+  lock.set_must_renew(true);
+
+  // run the renewal loop until canceled
+  while (yield.get_cancellation_state().cancelled() == boost::asio::cancellation_type::none) {
+    boost::system::error_code ec;
+    timer.expires_after(renew_every);
+    timer.async_wait(yield[ec]);
+    if (ec) {
+      break;
+    }
+
+    librados::ObjectWriteOperation op;
+    op.assert_exists();
+    lock.lock_exclusive(&op);
+    int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), yield);
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: MPSerializer lock renewal on "
+          << oid << " failed with " << ret << ". If this upload completes, "
+          "a racing request may overwrite and corrupt it." << dendl;
+      serializer.clear_locked();
+      return;
+    }
+  }
+  ldpp_dout(dpp, 20) << "MPSerializer lock renewal canceled" << dendl;
+}
+
+void MPRadosSerializer::start_renewal(ceph::timespan dur)
+{
+  auto lock = std::lock_guard{mutex};
+  renew_started = true;
+
+  // spawn a cancellable lock renewal coroutine that notifies a condition
+  // variable on completion
+  struct renewal_completion {
+    std::mutex& mutex;
+    ceph::async::async_cond<>& cond;
+    bool& done;
+
+    void operator()(std::exception_ptr) {
+      auto lock = std::unique_lock{mutex};
+      done = true;
+      cond.notify(lock);
+    }
+  };
+  auto completion = renewal_completion{mutex, cond, renew_canceled};
+
+  using namespace boost::asio;
+  spawn(ex,
+      [this, dur] (yield_context yield) {
+        renewal(this->dpp, *this, ioctx, oid, lock_state, timer, dur, yield);
+      }, bind_cancellation_slot(signal.slot(),
+                                bind_executor(ex, std::move(completion))));
+}
+
+void MPRadosSerializer::stop_renewal()
+{
+  auto lock = std::unique_lock{mutex};
+  if (!renew_started || // never started
+      renew_canceled) { // already done
+    return;
+  }
+
+  // signal cancellation
+  boost::asio::post(ex, [this] {
+      signal.emit(boost::asio::cancellation_type::terminal); });
+
+  // wait for notification of completion
+  boost::system::error_code ec_ignored;
+  if (y) {
+    cond.async_wait(lock, y.get_yield_context()[ec_ignored]);
+  } else {
+    maybe_warn_about_blocking(dpp);
+    cond.async_wait(lock, ceph::async::use_blocked[ec_ignored]);
+  }
+}
+
 int MPRadosSerializer::try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y)
 {
   librados::ObjectWriteOperation op;
   op.assert_exists();
-  lock.set_duration(dur);
-  lock.lock_exclusive(&op);
+  lock_state.set_duration(dur);
+  lock_state.set_may_renew(false);
+  lock_state.lock_exclusive(&op);
   int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y);
   if (! ret) {
     locked = true;
+
+    start_renewal(dur);
   }
   return ret;
 }
 
 int MPRadosSerializer::unlock(const DoutPrefixProvider *dpp, optional_yield y)
 {
+  // wait for the renewal coroutine to finish so it doesn't race with unlock
+  stop_renewal();
+
   librados::ObjectWriteOperation op;
   op.assert_exists();
-  lock.unlock(&op);
+  lock_state.unlock(&op);
   return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y);
 }
 
index 0ad0bf4caf0a5ed4a6f2a80ecf04a5230489bbdd..6dd8e7675717b308577e5f3768b470b83cb3c5d2 100644 (file)
@@ -904,11 +904,27 @@ protected:
 };
 
 class MPRadosSerializer : public StoreMPSerializer {
+  const DoutPrefixProvider* dpp;
+  optional_yield y; // context of request coroutine/thread
   librados::IoCtx ioctx;
-  ::rados::cls::lock::Lock lock;
+  ::rados::cls::lock::Lock lock_state;
+
+  // lock renewal state
+  boost::asio::any_io_executor ex; // strand executor for renewal
+  using Timer = boost::asio::basic_waitable_timer<ceph::coarse_mono_clock>;
+  Timer timer;
+  boost::asio::cancellation_signal signal;
+  std::mutex mutex;
+  ceph::async::async_cond<> cond;
+  bool renew_started = false;
+  bool renew_canceled = false;
+  void start_renewal(ceph::timespan dur);
+  void stop_renewal();
 
 public:
-  MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name);
+  MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y,
+                    RadosStore* store, RadosObject* obj, const std::string& lock_name);
+  ~MPRadosSerializer() override;
 
   virtual int try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y) override;
   virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override;