From 14ddf3c19067bf396079ecb56dfc4e53c8ccd243 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Thu, 26 Jun 2025 13:58:57 -0400 Subject: [PATCH] {neorados,osdc}: Support subsystem cancellation Tag operations with a subsystem so we can cancel them all in one go. Signed-off-by: Adam C. Emerson --- src/include/neorados/RADOS.hpp | 28 ++++++--- src/neorados/RADOS.cc | 22 +++++-- src/osdc/Objecter.cc | 63 +++++++++++++++++-- src/osdc/Objecter.h | 28 ++++++--- .../librados_test_stub/NeoradosTestStub.cc | 6 +- 5 files changed, 118 insertions(+), 29 deletions(-) diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index 8986f3948c0..4b595258ca8 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -1408,26 +1408,29 @@ public: auto execute(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, CompletionToken&& token, uint64_t* objver = nullptr, - const blkin_trace_info* trace_info = nullptr) { + const blkin_trace_info* trace_info = nullptr, + std::uint64_t subsystem = 0) { auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( - [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc, - ReadOp op) { + [bl, objver, trace_info, + subsystem, this](auto&& handler, Object o, IOContext ioc, + ReadOp op) { execute_(std::move(o), std::move(ioc), std::move(op), bl, - std::move(handler), objver, trace_info); + std::move(handler), objver, trace_info, subsystem); }, consigned, std::move(o), std::move(ioc), std::move(op)); } template CompletionToken> auto execute(Object o, IOContext ioc, WriteOp op, CompletionToken&& token, uint64_t* objver = nullptr, - const blkin_trace_info* trace_info = nullptr) { + const blkin_trace_info* trace_info = nullptr, + std::uint64_t subsystem = 0) { auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( - [objver, trace_info, this](auto&& handler, Object o, IOContext ioc, - WriteOp op) { + [objver, trace_info, + subsystem, this](auto&& handler, Object o, IOContext ioc, WriteOp op) { execute_(std::move(o), std::move(ioc), std::move(op), - std::move(handler), objver, trace_info); + std::move(handler), objver, trace_info, subsystem); }, consigned, std::move(o), std::move(ioc), std::move(op)); } @@ -1803,6 +1806,9 @@ public: uint64_t instance_id() const; + uint64_t new_subsystem() const; + void cancel_subsystem(uint64_t subsystem) const; + private: RADOS(); @@ -1816,11 +1822,13 @@ private: void execute_(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, Op::Completion c, - uint64_t* objver, const blkin_trace_info* trace_info); + uint64_t* objver, const blkin_trace_info* trace_info, + uint64_t subsystem); void execute_(Object o, IOContext ioc, WriteOp op, Op::Completion c, uint64_t* objver, - const blkin_trace_info* trace_info); + const blkin_trace_info* trace_info, + uint64_t subsystem); void lookup_pool_(std::string name, LookupPoolComp c); void list_pools_(LSPoolsComp c); diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index ff98d3a9488..41f389aeb18 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -938,7 +938,8 @@ asio::io_context& RADOS::get_io_context() { void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, cb::list* bl, ReadOp::Completion c, version_t* objver, - const blkin_trace_info *trace_info) { + const blkin_trace_info *trace_info, + std::uint64_t subsystem) { if (_op.size() == 0) { asio::dispatch(asio::append(std::move(c), bs::error_code{})); return; @@ -957,14 +958,16 @@ void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, trace.event("init"); impl->objecter->read( *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags, - std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace); + std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace, + subsystem); trace.event("submitted"); } void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, WriteOp::Completion c, version_t* objver, - const blkin_trace_info *trace_info) { + const blkin_trace_info *trace_info, + std::uint64_t subsystem) { if (_op.size() == 0) { asio::dispatch(asio::append(std::move(c), bs::error_code{})); return; @@ -989,7 +992,7 @@ void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, impl->objecter->mutate( *oid, ioc->oloc, std::move(op->op), ioc->snapc, mtime, flags, - std::move(c), objver, osd_reqid_t{}, &trace); + std::move(c), objver, osd_reqid_t{}, &trace, subsystem); trace.event("submitted"); } @@ -1966,6 +1969,17 @@ uint64_t RADOS::instance_id() const { return impl->get_instance_id(); } +uint64_t RADOS::new_subsystem() const +{ + return impl->objecter->unique_subsystem_id(); +} + +void RADOS::cancel_subsystem(uint64_t subsystem) const +{ + impl->objecter->subsystem_cancel(subsystem, + asio::error::operation_aborted); +} + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wnon-virtual-dtor" #pragma clang diagnostic push diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 7ba8875683f..2194816eeb8 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -2564,7 +2564,8 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_t ldout(cct, 5) << num_in_flight << " in flight" << dendl; } -int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) +int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r, + bs::error_code ec) { ceph_assert(initialized); @@ -2590,7 +2591,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) Op *op = p->second; if (op->has_completion()) { num_in_flight--; - op->complete(osdcode(r), r, service.get_executor()); + op->complete(ec, r, service.get_executor()); } _op_cancel_map_check(op); _finish_op(op, r); @@ -2599,6 +2600,58 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) return 0; } +void Objecter::subsystem_cancel(uint64_t subsystem, bs::error_code ec) +{ + unique_lock wl(rwlock); + + ldout(cct, 5) << __func__ << ": cancelling subsystem " << subsystem + << " ec=" << ec.message() << dendl; + + auto next_subsystem_op = [subsystem](const decltype(OSDSession::ops)& ops) { + auto i = std::find_if( + ops.begin(), ops.end(), + [subsystem](const auto& el) -> std::optional { + return el.second->subsystem == subsystem; + }); + if (i != ops.cend()) { + return std::make_optional(i->second->tid); + } + return std::optional{std::nullopt}; + }; + + // Since we only do this at shutdown, better to have it be slow than + // make it fast and introduce another cost per op. + +start: + + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + shared_lock sl(s->lock); + while (auto tid = next_subsystem_op(s->ops)) { + sl.unlock(); + auto ret = op_cancel(s, *tid, ceph::from_error_code(ec), ec); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } + sl.lock(); + } + } + + // Handle case where the op is in homeless session + shared_lock sl(homeless_session->lock); + while (auto tid = next_subsystem_op(homeless_session->ops)) { + sl.unlock(); + auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } + } + sl.unlock(); +} + int Objecter::op_cancel(ceph_tid_t tid, int r) { int ret = 0; @@ -2634,7 +2687,7 @@ start: shared_lock sl(s->lock); if (s->ops.find(tid) != s->ops.end()) { sl.unlock(); - ret = op_cancel(s, tid, r); + ret = op_cancel(s, tid, r, osdcode(r)); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ goto start; @@ -2650,7 +2703,7 @@ start: shared_lock sl(homeless_session->lock); if (homeless_session->ops.find(tid) != homeless_session->ops.end()) { sl.unlock(); - ret = op_cancel(homeless_session, tid, r); + ret = op_cancel(homeless_session, tid, r, osdcode(r)); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ goto start; @@ -2689,7 +2742,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) sl.unlock(); for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) { - int cancel_result = op_cancel(s, *titer, r); + int cancel_result = op_cancel(s, *titer, r, osdcode(r)); // We hold rwlock across search and cancellation, so cancels // should always succeed ceph_assert(cancel_result == 0); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 09413e9c11f..81a85232516 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1713,6 +1713,7 @@ public: private: std::atomic last_tid{0}; + std::atomic last_subsystem{0}; std::atomic inflight_ops{0}; std::atomic client_inc{-1}; uint64_t max_linger_id{0}; @@ -1795,6 +1796,11 @@ public: void maybe_request_map(); void enable_blocklist_events(); + + uint64_t unique_subsystem_id() { + return ++last_subsystem; + } + private: void _maybe_request_map(); @@ -2034,6 +2040,7 @@ public: osd_reqid_t reqid; // explicitly setting reqid ZTracer::Trace trace; + std::uint64_t subsystem = 0; const jspan_context* otel_trace = nullptr; static bool has_completion(decltype(onfinish)& f) { @@ -2065,7 +2072,7 @@ public: Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, int f, OpComp fin, version_t *ov, int *offset = nullptr, - ZTracer::Trace *parent_trace = nullptr) : + ZTracer::Trace *parent_trace = nullptr, uint64_t subsystem = 0) : target(o, ol, f), ops(std::move(_ops)), out_bl(ops.size(), nullptr), @@ -2074,7 +2081,7 @@ public: out_ec(ops.size(), nullptr), onfinish(std::move(fin)), objver(ov), - data_offset(offset) { + data_offset(offset), subsystem(subsystem) { if (target.base_oloc.key == o) target.base_oloc.key.clear(); if (parent_trace && parent_trace->valid()) { @@ -2085,7 +2092,8 @@ public: Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, int f, Context* fin, version_t *ov, int *offset = nullptr, - ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) : + ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr, + uint64_t subsystem = 0) : target(o, ol, f), ops(std::move(_ops)), out_bl(ops.size(), nullptr), @@ -2095,6 +2103,7 @@ public: onfinish(fin), objver(ov), data_offset(offset), + subsystem(subsystem), otel_trace(otel_trace) { if (target.base_oloc.key == o) target.base_oloc.key.clear(); @@ -2932,7 +2941,8 @@ public: /// cancel an in-progress request with the given return code private: - int op_cancel(OSDSession *s, ceph_tid_t tid, int r); + int op_cancel(OSDSession *s, ceph_tid_t tid, int r, + boost::system::error_code ec); int _op_cancel(ceph_tid_t tid, int r); int get_read_flags(int flags) { @@ -2949,6 +2959,7 @@ private: public: int op_cancel(ceph_tid_t tid, int r); + void subsystem_cancel(uint64_t subsystem, boost::system::error_code e); int op_cancel(const std::vector& tidls, int r); /** @@ -3054,10 +3065,10 @@ public: ceph::real_time mtime, int flags, Op::OpComp oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), - ZTracer::Trace *parent_trace = nullptr) { + ZTracer::Trace *parent_trace = nullptr, uint32_t subsystem = 0) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver, - nullptr, parent_trace); + nullptr, parent_trace, subsystem); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -3125,10 +3136,11 @@ public: ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, int flags, Op::OpComp onack, version_t *objver = nullptr, int *data_offset = nullptr, - uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { + uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr, + uint64_t subsystem = 0) { Op *o = new Op(oid, oloc, std::move(op.ops), get_read_flags(flags), std::move(onack), objver, - data_offset, parent_trace); + data_offset, parent_trace, subsystem); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; diff --git a/src/test/librados_test_stub/NeoradosTestStub.cc b/src/test/librados_test_stub/NeoradosTestStub.cc index fa04bfa40a7..14aaaf26ffe 100644 --- a/src/test/librados_test_stub/NeoradosTestStub.cc +++ b/src/test/librados_test_stub/NeoradosTestStub.cc @@ -584,7 +584,8 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const { void RADOS::execute_(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, Op::Completion c, - uint64_t* objver, const blkin_trace_info* trace_info) { + uint64_t* objver, const blkin_trace_info* trace_info, + uint64_t subsystem) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); @@ -603,7 +604,8 @@ void RADOS::execute_(Object o, IOContext ioc, ReadOp op, void RADOS::execute_(Object o, IOContext ioc, WriteOp op, Op::Completion c, uint64_t* objver, - const blkin_trace_info* trace_info) { + const blkin_trace_info* trace_info, + uint64_t subsystem) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); -- 2.47.3