From: Adam C. Emerson Date: Thu, 26 Jun 2025 17:58:57 +0000 (-0400) Subject: {neorados,osdc}: Support subsystem cancellation X-Git-Tag: v20.1.1~22^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2ae6e0a43f5ad7a1d383650e9a68ae7c6d2e5e02;p=ceph.git {neorados,osdc}: Support subsystem cancellation Tag operations with a subsystem so we can cancel them all in one go. Signed-off-by: Adam C. Emerson (cherry picked from commit 2526eb573b789b33b7d9ebf1169491f13e2318bb) Signed-off-by: Adam C. Emerson --- diff --git a/src/include/neorados/RADOS.hpp b/src/include/neorados/RADOS.hpp index d28be2935ed5..a3ff6f8abefb 100644 --- a/src/include/neorados/RADOS.hpp +++ b/src/include/neorados/RADOS.hpp @@ -1404,26 +1404,29 @@ public: auto execute(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, CompletionToken&& token, uint64_t* objver = nullptr, - const blkin_trace_info* trace_info = nullptr) { + const blkin_trace_info* trace_info = nullptr, + std::uint64_t subsystem = 0) { auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( - [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc, - ReadOp op) { + [bl, objver, trace_info, + subsystem, this](auto&& handler, Object o, IOContext ioc, + ReadOp op) { execute_(std::move(o), std::move(ioc), std::move(op), bl, - std::move(handler), objver, trace_info); + std::move(handler), objver, trace_info, subsystem); }, consigned, std::move(o), std::move(ioc), std::move(op)); } template CompletionToken> auto execute(Object o, IOContext ioc, WriteOp op, CompletionToken&& token, uint64_t* objver = nullptr, - const blkin_trace_info* trace_info = nullptr) { + const blkin_trace_info* trace_info = nullptr, + std::uint64_t subsystem = 0) { auto consigned = consign(std::forward(token)); return boost::asio::async_initiate( - [objver, trace_info, this](auto&& handler, Object o, IOContext ioc, - WriteOp op) { + [objver, trace_info, + subsystem, this](auto&& handler, Object o, IOContext ioc, WriteOp op) { execute_(std::move(o), std::move(ioc), std::move(op), - std::move(handler), objver, trace_info); + std::move(handler), objver, trace_info, subsystem); }, consigned, std::move(o), std::move(ioc), std::move(op)); } @@ -1799,6 +1802,9 @@ public: uint64_t instance_id() const; + uint64_t new_subsystem() const; + void cancel_subsystem(uint64_t subsystem) const; + private: RADOS(); @@ -1812,11 +1818,13 @@ private: void execute_(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, Op::Completion c, - uint64_t* objver, const blkin_trace_info* trace_info); + uint64_t* objver, const blkin_trace_info* trace_info, + uint64_t subsystem); void execute_(Object o, IOContext ioc, WriteOp op, Op::Completion c, uint64_t* objver, - const blkin_trace_info* trace_info); + const blkin_trace_info* trace_info, + uint64_t subsystem); void lookup_pool_(std::string name, LookupPoolComp c); void list_pools_(LSPoolsComp c); diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index ea96be487d28..4dcba611907f 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -938,7 +938,8 @@ asio::io_context& RADOS::get_io_context() { void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, cb::list* bl, ReadOp::Completion c, version_t* objver, - const blkin_trace_info *trace_info) { + const blkin_trace_info *trace_info, + std::uint64_t subsystem) { if (_op.size() == 0) { asio::dispatch(asio::append(std::move(c), bs::error_code{})); return; @@ -957,14 +958,16 @@ void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op, trace.event("init"); impl->objecter->read( *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags, - std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace); + std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace, + subsystem); trace.event("submitted"); } void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, WriteOp::Completion c, version_t* objver, - const blkin_trace_info *trace_info) { + const blkin_trace_info *trace_info, + std::uint64_t subsystem) { if (_op.size() == 0) { asio::dispatch(asio::append(std::move(c), bs::error_code{})); return; @@ -989,7 +992,7 @@ void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op, impl->objecter->mutate( *oid, ioc->oloc, std::move(op->op), ioc->snapc, mtime, flags, - std::move(c), objver, osd_reqid_t{}, &trace); + std::move(c), objver, osd_reqid_t{}, &trace, subsystem); trace.event("submitted"); } @@ -1967,6 +1970,17 @@ uint64_t RADOS::instance_id() const { return impl->get_instance_id(); } +uint64_t RADOS::new_subsystem() const +{ + return impl->objecter->unique_subsystem_id(); +} + +void RADOS::cancel_subsystem(uint64_t subsystem) const +{ + impl->objecter->subsystem_cancel(subsystem, + asio::error::operation_aborted); +} + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wnon-virtual-dtor" #pragma clang diagnostic push diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 4fb94549242e..32eb025074c7 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -2568,7 +2568,8 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_t ldout(cct, 5) << num_in_flight << " in flight" << dendl; } -int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) +int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r, + bs::error_code ec) { ceph_assert(initialized); @@ -2594,7 +2595,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) Op *op = p->second; if (op->has_completion()) { num_in_flight--; - op->complete(osdcode(r), r, service.get_executor()); + op->complete(ec, r, service.get_executor()); } _op_cancel_map_check(op); _finish_op(op, r); @@ -2603,6 +2604,58 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) return 0; } +void Objecter::subsystem_cancel(uint64_t subsystem, bs::error_code ec) +{ + unique_lock wl(rwlock); + + ldout(cct, 5) << __func__ << ": cancelling subsystem " << subsystem + << " ec=" << ec.message() << dendl; + + auto next_subsystem_op = [subsystem](const decltype(OSDSession::ops)& ops) { + auto i = std::find_if( + ops.begin(), ops.end(), + [subsystem](const auto& el) -> std::optional { + return el.second->subsystem == subsystem; + }); + if (i != ops.cend()) { + return std::make_optional(i->second->tid); + } + return std::optional{std::nullopt}; + }; + + // Since we only do this at shutdown, better to have it be slow than + // make it fast and introduce another cost per op. + +start: + + for (auto siter = osd_sessions.begin(); + siter != osd_sessions.end(); ++siter) { + auto s = siter->second; + shared_lock sl(s->lock); + while (auto tid = next_subsystem_op(s->ops)) { + sl.unlock(); + auto ret = op_cancel(s, *tid, ceph::from_error_code(ec), ec); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } + sl.lock(); + } + } + + // Handle case where the op is in homeless session + shared_lock sl(homeless_session->lock); + while (auto tid = next_subsystem_op(homeless_session->ops)) { + sl.unlock(); + auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec); + if (ret == -ENOENT) { + /* oh no! raced, maybe tid moved to another session, restarting */ + goto start; + } + } + sl.unlock(); +} + int Objecter::op_cancel(ceph_tid_t tid, int r) { int ret = 0; @@ -2638,7 +2691,7 @@ start: shared_lock sl(s->lock); if (s->ops.find(tid) != s->ops.end()) { sl.unlock(); - ret = op_cancel(s, tid, r); + ret = op_cancel(s, tid, r, osdcode(r)); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ goto start; @@ -2654,7 +2707,7 @@ start: shared_lock sl(homeless_session->lock); if (homeless_session->ops.find(tid) != homeless_session->ops.end()) { sl.unlock(); - ret = op_cancel(homeless_session, tid, r); + ret = op_cancel(homeless_session, tid, r, osdcode(r)); if (ret == -ENOENT) { /* oh no! raced, maybe tid moved to another session, restarting */ goto start; @@ -2693,7 +2746,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool) sl.unlock(); for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) { - int cancel_result = op_cancel(s, *titer, r); + int cancel_result = op_cancel(s, *titer, r, osdcode(r)); // We hold rwlock across search and cancellation, so cancels // should always succeed ceph_assert(cancel_result == 0); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 12b66193a17b..51f1687fc493 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1714,6 +1714,7 @@ public: private: std::atomic last_tid{0}; + std::atomic last_subsystem{0}; std::atomic inflight_ops{0}; std::atomic client_inc{-1}; uint64_t max_linger_id{0}; @@ -1796,6 +1797,11 @@ public: void maybe_request_map(); void enable_blocklist_events(); + + uint64_t unique_subsystem_id() { + return ++last_subsystem; + } + private: void _maybe_request_map(); @@ -2035,6 +2041,7 @@ public: osd_reqid_t reqid; // explicitly setting reqid ZTracer::Trace trace; + std::uint64_t subsystem = 0; const jspan_context* otel_trace = nullptr; static bool has_completion(decltype(onfinish)& f) { @@ -2066,7 +2073,7 @@ public: Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, int f, OpComp fin, version_t *ov, int *offset = nullptr, - ZTracer::Trace *parent_trace = nullptr) : + ZTracer::Trace *parent_trace = nullptr, uint64_t subsystem = 0) : target(o, ol, f), ops(std::move(_ops)), out_bl(ops.size(), nullptr), @@ -2075,7 +2082,7 @@ public: out_ec(ops.size(), nullptr), onfinish(std::move(fin)), objver(ov), - data_offset(offset) { + data_offset(offset), subsystem(subsystem) { if (target.base_oloc.key == o) target.base_oloc.key.clear(); if (parent_trace && parent_trace->valid()) { @@ -2086,7 +2093,8 @@ public: Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, int f, Context* fin, version_t *ov, int *offset = nullptr, - ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) : + ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr, + uint64_t subsystem = 0) : target(o, ol, f), ops(std::move(_ops)), out_bl(ops.size(), nullptr), @@ -2096,6 +2104,7 @@ public: onfinish(fin), objver(ov), data_offset(offset), + subsystem(subsystem), otel_trace(otel_trace) { if (target.base_oloc.key == o) target.base_oloc.key.clear(); @@ -2933,7 +2942,8 @@ public: /// cancel an in-progress request with the given return code private: - int op_cancel(OSDSession *s, ceph_tid_t tid, int r); + int op_cancel(OSDSession *s, ceph_tid_t tid, int r, + boost::system::error_code ec); int _op_cancel(ceph_tid_t tid, int r); int get_read_flags(int flags) { @@ -2950,6 +2960,7 @@ private: public: int op_cancel(ceph_tid_t tid, int r); + void subsystem_cancel(uint64_t subsystem, boost::system::error_code e); int op_cancel(const std::vector& tidls, int r); /** @@ -3055,10 +3066,10 @@ public: ceph::real_time mtime, int flags, Op::OpComp oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(), - ZTracer::Trace *parent_trace = nullptr) { + ZTracer::Trace *parent_trace = nullptr, uint32_t subsystem = 0) { Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags | CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver, - nullptr, parent_trace); + nullptr, parent_trace, subsystem); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -3114,10 +3125,11 @@ public: ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl, int flags, Op::OpComp onack, version_t *objver = nullptr, int *data_offset = nullptr, - uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) { + uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr, + uint64_t subsystem = 0) { Op *o = new Op(oid, oloc, std::move(op.ops), get_read_flags(flags), std::move(onack), objver, - data_offset, parent_trace); + data_offset, parent_trace, subsystem); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; diff --git a/src/test/librados_test_stub/NeoradosTestStub.cc b/src/test/librados_test_stub/NeoradosTestStub.cc index fa04bfa40a7c..14aaaf26ffe6 100644 --- a/src/test/librados_test_stub/NeoradosTestStub.cc +++ b/src/test/librados_test_stub/NeoradosTestStub.cc @@ -584,7 +584,8 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const { void RADOS::execute_(Object o, IOContext ioc, ReadOp op, ceph::buffer::list* bl, Op::Completion c, - uint64_t* objver, const blkin_trace_info* trace_info) { + uint64_t* objver, const blkin_trace_info* trace_info, + uint64_t subsystem) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne)); @@ -603,7 +604,8 @@ void RADOS::execute_(Object o, IOContext ioc, ReadOp op, void RADOS::execute_(Object o, IOContext ioc, WriteOp op, Op::Completion c, uint64_t* objver, - const blkin_trace_info* trace_info) { + const blkin_trace_info* trace_info, + uint64_t subsystem) { auto io_ctx = impl->get_io_ctx(ioc); if (io_ctx == nullptr) { asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));