std::unique_ptr<MPSerializer> RadosObject::get_serializer(const DoutPrefixProvider *dpp, optional_yield y, const std::string& lock_name)
{
- return std::make_unique<MPRadosSerializer>(dpp, store, this, lock_name);
+ return std::make_unique<MPRadosSerializer>(dpp, y, store, this, lock_name);
}
int RadosObject::transition(Bucket* bucket,
ptail_placement_rule, part_num, part_num_str, obj->get_trace());
}
-MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
- lock(lock_name)
+MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y,
+ RadosStore* store, RadosObject* obj,
+ const std::string& lock_name)
+ : dpp(dpp),
+ y(y),
+ lock_state(lock_name),
+ ex(boost::asio::make_strand(store->get_io_context())),
+ timer(ex),
+ cond(ex)
{
rgw_pool meta_pool;
rgw_raw_obj raw_obj;
store->getRados()->open_pool_ctx(dpp, meta_pool, ioctx, true, true);
}
+MPRadosSerializer::~MPRadosSerializer()
+{
+ // stop the renewal coroutine if unlock() wasn't called
+ stop_renewal();
+}
+
+static void renewal(const DoutPrefixProvider* dpp,
+ rgw::sal::MPSerializer& serializer,
+ librados::IoCtx& ioctx,
+ const std::string& oid,
+ rados::cls::lock::Lock lock,
+ auto& timer,
+ ceph::timespan dur,
+ boost::asio::yield_context yield)
+{
+ const ceph::timespan renew_every = dur / 2;
+ lock.set_duration(dur);
+ lock.set_must_renew(true);
+
+ // run the renewal loop until canceled
+ while (yield.get_cancellation_state().cancelled() == boost::asio::cancellation_type::none) {
+ boost::system::error_code ec;
+ timer.expires_after(renew_every);
+ timer.async_wait(yield[ec]);
+ if (ec) {
+ break;
+ }
+
+ librados::ObjectWriteOperation op;
+ op.assert_exists();
+ lock.lock_exclusive(&op);
+ int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: MPSerializer lock renewal on "
+ << oid << " failed with " << ret << ". If this upload completes, "
+ "a racing request may overwrite and corrupt it." << dendl;
+ serializer.clear_locked();
+ return;
+ }
+ }
+ ldpp_dout(dpp, 20) << "MPSerializer lock renewal canceled" << dendl;
+}
+
+void MPRadosSerializer::start_renewal(ceph::timespan dur)
+{
+ auto lock = std::lock_guard{mutex};
+ renew_started = true;
+
+ // spawn a cancellable lock renewal coroutine that notifies a condition
+ // variable on completion
+ struct renewal_completion {
+ std::mutex& mutex;
+ ceph::async::async_cond<>& cond;
+ bool& done;
+
+ void operator()(std::exception_ptr) {
+ auto lock = std::unique_lock{mutex};
+ done = true;
+ cond.notify(lock);
+ }
+ };
+ auto completion = renewal_completion{mutex, cond, renew_canceled};
+
+ using namespace boost::asio;
+ spawn(ex,
+ [this, dur] (yield_context yield) {
+ renewal(this->dpp, *this, ioctx, oid, lock_state, timer, dur, yield);
+ }, bind_cancellation_slot(signal.slot(),
+ bind_executor(ex, std::move(completion))));
+}
+
+void MPRadosSerializer::stop_renewal()
+{
+ auto lock = std::unique_lock{mutex};
+ if (!renew_started || // never started
+ renew_canceled) { // already done
+ return;
+ }
+
+ // signal cancellation
+ boost::asio::post(ex, [this] {
+ signal.emit(boost::asio::cancellation_type::terminal); });
+
+ // wait for notification of completion
+ boost::system::error_code ec_ignored;
+ if (y) {
+ cond.async_wait(lock, y.get_yield_context()[ec_ignored]);
+ } else {
+ maybe_warn_about_blocking(dpp);
+ cond.async_wait(lock, ceph::async::use_blocked[ec_ignored]);
+ }
+}
+
int MPRadosSerializer::try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y)
{
librados::ObjectWriteOperation op;
op.assert_exists();
- lock.set_duration(dur);
- lock.lock_exclusive(&op);
+ lock_state.set_duration(dur);
+ lock_state.set_may_renew(false);
+ lock_state.lock_exclusive(&op);
int ret = rgw_rados_operate(dpp, ioctx, oid, std::move(op), y);
if (! ret) {
locked = true;
+
+ start_renewal(dur);
}
return ret;
}
int MPRadosSerializer::unlock(const DoutPrefixProvider *dpp, optional_yield y)
{
+ // wait for the renewal coroutine to finish so it doesn't race with unlock
+ stop_renewal();
+
librados::ObjectWriteOperation op;
op.assert_exists();
- lock.unlock(&op);
+ lock_state.unlock(&op);
return rgw_rados_operate(dpp, ioctx, oid, std::move(op), y);
}
};
class MPRadosSerializer : public StoreMPSerializer {
+ const DoutPrefixProvider* dpp;
+ optional_yield y; // context of request coroutine/thread
librados::IoCtx ioctx;
- ::rados::cls::lock::Lock lock;
+ ::rados::cls::lock::Lock lock_state;
+
+ // lock renewal state
+ boost::asio::any_io_executor ex; // strand executor for renewal
+ using Timer = boost::asio::basic_waitable_timer<ceph::coarse_mono_clock>;
+ Timer timer;
+ boost::asio::cancellation_signal signal;
+ std::mutex mutex;
+ ceph::async::async_cond<> cond;
+ bool renew_started = false;
+ bool renew_canceled = false;
+ void start_renewal(ceph::timespan dur);
+ void stop_renewal();
public:
- MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name);
+ MPRadosSerializer(const DoutPrefixProvider *dpp, optional_yield y,
+ RadosStore* store, RadosObject* obj, const std::string& lock_name);
+ ~MPRadosSerializer() override;
virtual int try_lock(const DoutPrefixProvider *dpp, ceph::timespan dur, optional_yield y) override;
virtual int unlock(const DoutPrefixProvider* dpp, optional_yield y) override;