From: Casey Bodley Date: Wed, 11 Mar 2026 21:45:55 +0000 (-0400) Subject: rgw/rados: MPRadosSerializer spawns lock renewal coroutine X-Git-Tag: v21.0.0~9^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ab33073f8a21d900f952617e1957dee5a02b2e34;p=ceph.git rgw/rados: MPRadosSerializer spawns lock renewal coroutine CompleteMultipartUpload depends on this lock to ensure consistency of uploads and protect against data loss, so we should try very hard to hold this lock as long as it takes to complete successfully MPRadosSerializer accomplishes this by spawning a background lock renewal coroutine. this coroutine is started during a successful call to try_lock(), and stopped before unlock() releases the lock Fixes: https://tracker.ceph.com/issues/75375 Signed-off-by: Casey Bodley --- diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 1a54761faa2..2fa4095750d 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -3195,7 +3195,7 @@ int RadosObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_y std::unique_ptr RadosObject::get_serializer(const DoutPrefixProvider *dpp, optional_yield y, const std::string& lock_name) { - return std::make_unique(dpp, store, this, lock_name); + return std::make_unique(dpp, y, store, this, lock_name); } int RadosObject::transition(Bucket* bucket, @@ -4632,8 +4632,15 @@ std::unique_ptr RadosMultipartUpload::get_writer( ptail_placement_rule, part_num, part_num_str, obj->get_trace()); } -MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) : - lock(lock_name) +MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y, + RadosStore* store, RadosObject* obj, + const std::string& lock_name) + : dpp(dpp), + y(y), + lock_state(lock_name), + ex(boost::asio::make_strand(store->get_io_context())), + timer(ex), + cond(ex) { rgw_pool meta_pool; rgw_raw_obj raw_obj; @@ -4645,24 +4652,123 @@ MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store->getRados()->open_pool_ctx(dpp, meta_pool, ioctx, true, true); } +MPRadosSerializer::~MPRadosSerializer() +{ + // stop the renewal coroutine if unlock() wasn't called + stop_renewal(); +} + +static void renewal(const DoutPrefixProvider* dpp, + rgw::sal::MPSerializer& serializer, + librados::IoCtx& ioctx, + const std::string& oid, + rados::cls::lock::Lock lock, + auto& timer, + ceph::timespan dur, + boost::asio::yield_context yield) +{ + const ceph::timespan renew_every = dur / 2; + lock.set_duration(dur); + lock.set_must_renew(true); + + // run the renewal loop until canceled + while (yield.get_cancellation_state().cancelled() == boost::asio::cancellation_type::none) { + boost::system::error_code ec; + timer.expires_after(renew_every); + timer.async_wait(yield[ec]); + if (ec) { + break; + } + + librados::ObjectWriteOperation op; + op.assert_exists(); + lock.lock_exclusive(&op); + int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), yield); + if (ret < 0) { + ldpp_dout(dpp, 0) << "ERROR: MPSerializer lock renewal on " + << oid << " failed with " << ret << ". If this upload completes, " + "a racing request may overwrite and corrupt it." << dendl; + serializer.clear_locked(); + return; + } + } + ldpp_dout(dpp, 20) << "MPSerializer lock renewal canceled" << dendl; +} + +void MPRadosSerializer::start_renewal(ceph::timespan dur) +{ + auto lock = std::lock_guard{mutex}; + renew_started = true; + + // spawn a cancellable lock renewal coroutine that notifies a condition + // variable on completion + struct renewal_completion { + std::mutex& mutex; + ceph::async::async_cond<>& cond; + bool& done; + + void operator()(std::exception_ptr) { + auto lock = std::unique_lock{mutex}; + done = true; + cond.notify(lock); + } + }; + auto completion = renewal_completion{mutex, cond, renew_canceled}; + + using namespace boost::asio; + spawn(ex, + [this, dur] (yield_context yield) { + renewal(this->dpp, *this, ioctx, oid, lock_state, timer, dur, yield); + }, bind_cancellation_slot(signal.slot(), + bind_executor(ex, std::move(completion)))); +} + +void MPRadosSerializer::stop_renewal() +{ + auto lock = std::unique_lock{mutex}; + if (!renew_started || // never started + renew_canceled) { // already done + return; + } + + // signal cancellation + boost::asio::post(ex, [this] { + signal.emit(boost::asio::cancellation_type::terminal); }); + + // wait for notification of completion + boost::system::error_code ec_ignored; + if (y) { + cond.async_wait(lock, y.get_yield_context()[ec_ignored]); + } else { + maybe_warn_about_blocking(dpp); + cond.async_wait(lock, ceph::async::use_blocked[ec_ignored]); + } +} + int MPRadosSerializer::try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y) { librados::ObjectWriteOperation op; op.assert_exists(); - lock.set_duration(dur); - lock.lock_exclusive(&op); + lock_state.set_duration(dur); + lock_state.set_may_renew(false); + lock_state.lock_exclusive(&op); int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); if (! ret) { locked = true; + + start_renewal(dur); } return ret; } int MPRadosSerializer::unlock(const DoutPrefixProvider *dpp, optional_yield y) { + // wait for the renewal coroutine to finish so it doesn't race with unlock + stop_renewal(); + librados::ObjectWriteOperation op; op.assert_exists(); - lock.unlock(&op); + lock_state.unlock(&op); return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y); } diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 0ad0bf4caf0..6dd8e767571 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -904,11 +904,27 @@ protected: }; class MPRadosSerializer : public StoreMPSerializer { + const DoutPrefixProvider* dpp; + optional_yield y; // context of request coroutine/thread librados::IoCtx ioctx; - ::rados::cls::lock::Lock lock; + ::rados::cls::lock::Lock lock_state; + + // lock renewal state + boost::asio::any_io_executor ex; // strand executor for renewal + using Timer = boost::asio::basic_waitable_timer; + Timer timer; + boost::asio::cancellation_signal signal; + std::mutex mutex; + ceph::async::async_cond<> cond; + bool renew_started = false; + bool renew_canceled = false; + void start_renewal(ceph::timespan dur); + void stop_renewal(); public: - MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name); + MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y, + RadosStore* store, RadosObject* obj, const std::string& lock_name); + ~MPRadosSerializer() override; virtual int try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y) override; virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override;