From: Casey Bodley Date: Wed, 26 Mar 2025 21:57:55 +0000 (-0400) Subject: librados/asio: add async_watch/unwatch() X-Git-Tag: testing/wip-vshankar-testing-20250411.090237-debug~22^2~4 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=27168f00aa55157a917e89d0ddddb2a09b05eff8;p=ceph-ci.git librados/asio: add async_watch/unwatch() Signed-off-by: Casey Bodley --- diff --git a/src/librados/librados_asio.h b/src/librados/librados_asio.h index 00a82f28517..26e4fe2fa2c 100644 --- a/src/librados/librados_asio.h +++ b/src/librados/librados_asio.h @@ -268,6 +268,65 @@ auto async_operate(IoExecutor ex, IoCtx& io, const std::string& oid, }, token, ex, io, oid, std::move(write_op), flags, trace_ctx); } +/// Calls IoCtx::aio_watch2() and arranges for the AioCompletion to call a +/// given handler with signature (error_code, version_t). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. +template +auto async_watch(IoExecutor ex, IoCtx& io, const std::string& oid, + uint64_t* handle, librados::WatchCtx2* ctx, + uint32_t timeout_ms, CompletionToken &&token) +{ + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; + return boost::asio::async_initiate( + [] (auto handler, IoExecutor ex, const IoCtx& i, const std::string& oid, + uint64_t* handle, librados::WatchCtx2* ctx, uint32_t timeout_ms) { + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); + auto& op = p->user_data; + + IoCtx& io = const_cast(i); + int ret = io.aio_watch2(oid, op.aio_completion.get(), + handle, ctx, timeout_ms); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, 0); + } else { + p.release(); // release ownership until completion + } + }, token, ex, io, oid, handle, ctx, timeout_ms); +} + +/// Calls IoCtx::aio_unwatch() and arranges for the AioCompletion to call a +/// given handler with signature (error_code, version_t). +/// +/// The given IoCtx reference is not required to remain valid, but some IoCtx +/// instance must preserve its underlying implementation until completion. +template +auto async_unwatch(IoExecutor ex, IoCtx& io, uint64_t handle, + CompletionToken &&token) +{ + using Op = detail::AsyncOp; + using Signature = typename Op::Signature; + return boost::asio::async_initiate( + [] (auto handler, IoExecutor ex, const IoCtx& i, uint64_t handle) { + constexpr bool is_read = false; + auto p = Op::create(ex, is_read, std::move(handler)); + auto& op = p->user_data; + + IoCtx& io = const_cast(i); + int ret = io.aio_unwatch(handle, op.aio_completion.get()); + if (ret < 0) { + auto ec = boost::system::error_code{-ret, librados::detail::err_category()}; + ceph::async::post(std::move(p), ec, 0); + } else { + p.release(); // release ownership until completion + } + }, token, ex, io, handle); +} + /// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a /// given handler with signature (error_code, version_t, bufferlist). ///