]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
{neorados,osdc}: Support subsystem cancellation
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 26 Jun 2025 17:58:57 +0000 (13:58 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 5 Aug 2025 20:29:06 +0000 (16:29 -0400)
Tag operations with a subsystem so we can cancel them all in one go.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/include/neorados/RADOS.hpp
src/neorados/RADOS.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/test/librados_test_stub/NeoradosTestStub.cc

index 8986f3948c032868674c097e36c1187a1a263ad0..4b595258ca80fa859a15c2b9c4b32301e27af2a4 100644 (file)
@@ -1408,26 +1408,29 @@ public:
   auto execute(Object o, IOContext ioc, ReadOp op,
               ceph::buffer::list* bl,
               CompletionToken&& token, uint64_t* objver = nullptr,
-              const blkin_trace_info* trace_info = nullptr) {
+              const blkin_trace_info* trace_info = nullptr,
+              std::uint64_t subsystem = 0) {
     auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
-      [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
-                                    ReadOp op) {
+      [bl, objver, trace_info,
+       subsystem, this](auto&& handler, Object o, IOContext ioc,
+                       ReadOp op) {
        execute_(std::move(o), std::move(ioc), std::move(op), bl,
-                std::move(handler), objver, trace_info);
+                std::move(handler), objver, trace_info, subsystem);
       }, consigned, std::move(o), std::move(ioc), std::move(op));
   }
 
   template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
   auto execute(Object o, IOContext ioc, WriteOp op,
               CompletionToken&& token, uint64_t* objver = nullptr,
-              const blkin_trace_info* trace_info = nullptr) {
+              const blkin_trace_info* trace_info = nullptr,
+              std::uint64_t subsystem = 0) {
     auto consigned = consign(std::forward<CompletionToken>(token));
     return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
-      [objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
-                                WriteOp op) {
+      [objver, trace_info,
+       subsystem, this](auto&& handler, Object o, IOContext ioc, WriteOp op) {
        execute_(std::move(o), std::move(ioc), std::move(op),
-                std::move(handler), objver, trace_info);
+                std::move(handler), objver, trace_info, subsystem);
       }, consigned, std::move(o), std::move(ioc), std::move(op));
   }
 
@@ -1803,6 +1806,9 @@ public:
 
   uint64_t instance_id() const;
 
+  uint64_t new_subsystem() const;
+  void cancel_subsystem(uint64_t subsystem) const;
+
 private:
 
   RADOS();
@@ -1816,11 +1822,13 @@ private:
 
   void execute_(Object o, IOContext ioc, ReadOp op,
                ceph::buffer::list* bl, Op::Completion c,
-               uint64_t* objver, const blkin_trace_info* trace_info);
+               uint64_t* objver, const blkin_trace_info* trace_info,
+               uint64_t subsystem);
 
   void execute_(Object o, IOContext ioc, WriteOp op,
                Op::Completion c, uint64_t* objver,
-               const blkin_trace_info* trace_info);
+               const blkin_trace_info* trace_info,
+               uint64_t subsystem);
 
   void lookup_pool_(std::string name, LookupPoolComp c);
   void list_pools_(LSPoolsComp c);
index ff98d3a94885e6d837232149f38e73dc2e19d419..41f389aeb181fa8afc6d6c2a71c6c68a22316edd 100644 (file)
@@ -938,7 +938,8 @@ asio::io_context& RADOS::get_io_context() {
 void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
                     cb::list* bl,
                     ReadOp::Completion c, version_t* objver,
-                    const blkin_trace_info *trace_info) {
+                    const blkin_trace_info *trace_info,
+                    std::uint64_t subsystem) {
   if (_op.size() == 0) {
     asio::dispatch(asio::append(std::move(c), bs::error_code{}));
     return;
@@ -957,14 +958,16 @@ void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
   trace.event("init");
   impl->objecter->read(
     *oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags,
-    std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace);
+    std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace,
+    subsystem);
 
   trace.event("submitted");
 }
 
 void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
                     WriteOp::Completion c, version_t* objver,
-                    const blkin_trace_info *trace_info) {
+                    const blkin_trace_info *trace_info,
+                    std::uint64_t subsystem) {
   if (_op.size() == 0) {
     asio::dispatch(asio::append(std::move(c), bs::error_code{}));
     return;
@@ -989,7 +992,7 @@ void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
   impl->objecter->mutate(
     *oid, ioc->oloc, std::move(op->op), ioc->snapc,
     mtime, flags,
-    std::move(c), objver, osd_reqid_t{}, &trace);
+    std::move(c), objver, osd_reqid_t{}, &trace, subsystem);
   trace.event("submitted");
 }
 
@@ -1966,6 +1969,17 @@ uint64_t RADOS::instance_id() const {
   return impl->get_instance_id();
 }
 
+uint64_t RADOS::new_subsystem() const
+{
+  return impl->objecter->unique_subsystem_id();
+}
+
+void RADOS::cancel_subsystem(uint64_t subsystem) const
+{
+  impl->objecter->subsystem_cancel(subsystem,
+                                  asio::error::operation_aborted);
+}
+
 #pragma GCC diagnostic push
 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
 #pragma clang diagnostic push
index 7ba8875683f401cfbc0f3608bd2de48d9908ccfe..2194816eeb80039f722f3cb61521ca30e8de5aad 100644 (file)
@@ -2564,7 +2564,8 @@ void Objecter::_op_submit(Op *op, shunique_lock<ceph::shared_mutex>& sul, ceph_t
   ldout(cct, 5) << num_in_flight << " in flight" << dendl;
 }
 
-int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
+int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r,
+                       bs::error_code ec)
 {
   ceph_assert(initialized);
 
@@ -2590,7 +2591,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
   Op *op = p->second;
   if (op->has_completion()) {
     num_in_flight--;
-    op->complete(osdcode(r), r, service.get_executor());
+    op->complete(ec, r, service.get_executor());
   }
   _op_cancel_map_check(op);
   _finish_op(op, r);
@@ -2599,6 +2600,58 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
   return 0;
 }
 
+void Objecter::subsystem_cancel(uint64_t subsystem, bs::error_code ec)
+{
+  unique_lock wl(rwlock);
+
+  ldout(cct, 5) << __func__ << ": cancelling subsystem " << subsystem
+               << " ec=" << ec.message() << dendl;
+
+  auto next_subsystem_op = [subsystem](const decltype(OSDSession::ops)& ops) {
+    auto i = std::find_if(
+      ops.begin(), ops.end(),
+      [subsystem](const auto& el) -> std::optional<ceph_tid_t> {
+       return el.second->subsystem == subsystem;
+      });
+    if (i != ops.cend()) {
+      return std::make_optional(i->second->tid);
+    }
+    return std::optional<ceph_tid_t>{std::nullopt};
+  };
+
+  // Since we only do this at shutdown, better to have it be slow than
+  // make it fast and introduce another cost per op.
+
+start:
+
+  for (auto siter = osd_sessions.begin();
+       siter != osd_sessions.end(); ++siter) {
+    auto s = siter->second;
+    shared_lock sl(s->lock);
+    while (auto tid = next_subsystem_op(s->ops)) {
+      sl.unlock();
+      auto ret = op_cancel(s, *tid, ceph::from_error_code(ec), ec);
+      if (ret == -ENOENT) {
+       /* oh no! raced, maybe tid moved to another session, restarting */
+       goto start;
+      }
+      sl.lock();
+    }
+  }
+
+  // Handle case where the op is in homeless session
+  shared_lock sl(homeless_session->lock);
+  while (auto tid = next_subsystem_op(homeless_session->ops)) {
+    sl.unlock();
+    auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec);
+    if (ret == -ENOENT) {
+      /* oh no! raced, maybe tid moved to another session, restarting */
+      goto start;
+    }
+  }
+  sl.unlock();
+}
+
 int Objecter::op_cancel(ceph_tid_t tid, int r)
 {
   int ret = 0;
@@ -2634,7 +2687,7 @@ start:
     shared_lock sl(s->lock);
     if (s->ops.find(tid) != s->ops.end()) {
       sl.unlock();
-      ret = op_cancel(s, tid, r);
+      ret = op_cancel(s, tid, r, osdcode(r));
       if (ret == -ENOENT) {
        /* oh no! raced, maybe tid moved to another session, restarting */
        goto start;
@@ -2650,7 +2703,7 @@ start:
   shared_lock sl(homeless_session->lock);
   if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
     sl.unlock();
-    ret = op_cancel(homeless_session, tid, r);
+    ret = op_cancel(homeless_session, tid, r, osdcode(r));
     if (ret == -ENOENT) {
       /* oh no! raced, maybe tid moved to another session, restarting */
       goto start;
@@ -2689,7 +2742,7 @@ epoch_t Objecter::op_cancel_writes(int r, int64_t pool)
     sl.unlock();
 
     for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
-      int cancel_result = op_cancel(s, *titer, r);
+      int cancel_result = op_cancel(s, *titer, r, osdcode(r));
       // We hold rwlock across search and cancellation, so cancels
       // should always succeed
       ceph_assert(cancel_result == 0);
index 09413e9c11f379d2f9b69863e68ebd6cf8550cba..81a85232516c53579077bad1c6aaee3445db9662 100644 (file)
@@ -1713,6 +1713,7 @@ public:
 
 private:
   std::atomic<uint64_t> last_tid{0};
+  std::atomic<uint64_t> last_subsystem{0};
   std::atomic<unsigned> inflight_ops{0};
   std::atomic<int> client_inc{-1};
   uint64_t max_linger_id{0};
@@ -1795,6 +1796,11 @@ public:
   void maybe_request_map();
 
   void enable_blocklist_events();
+
+  uint64_t unique_subsystem_id() {
+    return ++last_subsystem;
+  }
+
 private:
 
   void _maybe_request_map();
@@ -2034,6 +2040,7 @@ public:
 
     osd_reqid_t reqid; // explicitly setting reqid
     ZTracer::Trace trace;
+    std::uint64_t subsystem = 0;
     const jspan_context* otel_trace = nullptr;
 
     static bool has_completion(decltype(onfinish)& f) {
@@ -2065,7 +2072,7 @@ public:
 
     Op(const object_t& o, const object_locator_t& ol,  osdc_opvec&& _ops,
        int f, OpComp fin, version_t *ov, int *offset = nullptr,
-       ZTracer::Trace *parent_trace = nullptr) :
+       ZTracer::Trace *parent_trace = nullptr, uint64_t subsystem = 0) :
       target(o, ol, f),
       ops(std::move(_ops)),
       out_bl(ops.size(), nullptr),
@@ -2074,7 +2081,7 @@ public:
       out_ec(ops.size(), nullptr),
       onfinish(std::move(fin)),
       objver(ov),
-      data_offset(offset) {
+      data_offset(offset), subsystem(subsystem) {
       if (target.base_oloc.key == o)
        target.base_oloc.key.clear();
       if (parent_trace && parent_trace->valid()) {
@@ -2085,7 +2092,8 @@ public:
 
     Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
        int f, Context* fin, version_t *ov, int *offset = nullptr,
-       ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) :
+       ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr,
+       uint64_t subsystem = 0) :
       target(o, ol, f),
       ops(std::move(_ops)),
       out_bl(ops.size(), nullptr),
@@ -2095,6 +2103,7 @@ public:
       onfinish(fin),
       objver(ov),
       data_offset(offset),
+      subsystem(subsystem),
       otel_trace(otel_trace) {
       if (target.base_oloc.key == o)
        target.base_oloc.key.clear();
@@ -2932,7 +2941,8 @@ public:
 
   /// cancel an in-progress request with the given return code
 private:
-  int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
+  int op_cancel(OSDSession *s, ceph_tid_t tid, int r,
+               boost::system::error_code ec);
   int _op_cancel(ceph_tid_t tid, int r);
 
   int get_read_flags(int flags) {
@@ -2949,6 +2959,7 @@ private:
 
 public:
   int op_cancel(ceph_tid_t tid, int r);
+  void subsystem_cancel(uint64_t subsystem, boost::system::error_code e);
   int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
 
   /**
@@ -3054,10 +3065,10 @@ public:
              ceph::real_time mtime, int flags,
              Op::OpComp oncommit,
              version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
-             ZTracer::Trace *parent_trace = nullptr) {
+             ZTracer::Trace *parent_trace = nullptr, uint32_t subsystem = 0) {
     Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
                   CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
-                  nullptr, parent_trace);
+                  nullptr, parent_trace, subsystem);
     o->priority = op.priority;
     o->mtime = mtime;
     o->snapc = snapc;
@@ -3125,10 +3136,11 @@ public:
            ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
            int flags, Op::OpComp onack,
            version_t *objver = nullptr, int *data_offset = nullptr,
-           uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
+           uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr,
+           uint64_t subsystem = 0) {
     Op *o = new Op(oid, oloc, std::move(op.ops), get_read_flags(flags),
                   std::move(onack), objver,
-                  data_offset, parent_trace);
+                  data_offset, parent_trace, subsystem);
     o->priority = op.priority;
     o->snapid = snapid;
     o->outbl = pbl;
index fa04bfa40a7c352df9154b98a8eaaca44a607a08..14aaaf26ffe63c1a1535d6a102c82268b3621afe 100644 (file)
@@ -584,7 +584,8 @@ boost::asio::io_context::executor_type neorados::RADOS::get_executor() const {
 
 void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
                     ceph::buffer::list* bl, Op::Completion c,
-                    uint64_t* objver, const blkin_trace_info* trace_info) {
+                    uint64_t* objver, const blkin_trace_info* trace_info,
+                    uint64_t subsystem) {
   auto io_ctx = impl->get_io_ctx(ioc);
   if (io_ctx == nullptr) {
     asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
@@ -603,7 +604,8 @@ void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
 
 void RADOS::execute_(Object o, IOContext ioc, WriteOp op,
                     Op::Completion c, uint64_t* objver,
-                    const blkin_trace_info* trace_info) {
+                    const blkin_trace_info* trace_info,
+                    uint64_t subsystem) {
   auto io_ctx = impl->get_io_ctx(ioc);
   if (io_ctx == nullptr) {
     asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));