]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Watch/Notify memory leak fix maybe!
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 19 Mar 2025 02:47:04 +0000 (22:47 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 1 Apr 2025 15:10:14 +0000 (11:10 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/neorados/RADOS.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index c7eae5e93c30bff86cc41b60abcddb49dfbb0784..f0b100bf96da677b6fec2011d0d3fbad1fd850e8 100644 (file)
@@ -1373,6 +1373,7 @@ class Notifier : public async::service_list_base_hook {
   };
 
   asio::io_context::executor_type ex;
+  Objecter::LingerOp* linger_op;
   // Zero for unbounded. I would not recommend this.
   const uint32_t capacity;
 
@@ -1383,14 +1384,18 @@ class Notifier : public async::service_list_base_hook {
   uint64_t next_id = 0;
 
   void service_shutdown() {
+    if (linger_op) {
+      linger_op->put();
+    }
     std::unique_lock l(m);
     handlers.clear();
   }
 
 public:
 
-  Notifier(asio::io_context::executor_type ex, uint32_t capacity)
-    : ex(ex), capacity(capacity),
+  Notifier(asio::io_context::executor_type ex, Objecter::LingerOp* linger_op,
+          uint32_t capacity)
+    : ex(ex), linger_op(linger_op), capacity(capacity),
       svc(asio::use_service<async::service<Notifier>>(
            asio::query(ex, boost::asio::execution::context))) {
     // register for service_shutdown() notifications
@@ -1507,7 +1512,11 @@ void RADOS::watch_(Object o, IOContext _ioc,
     linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
     asio::bind_executor(
       std::move(e),
-      [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+      [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable {
+       if (e) {
+         linger_op->objecter->linger_cancel(linger_op);
+         cookie = 0;
+       }
        asio::dispatch(asio::append(std::move(c), e, cookie));
       }), nullptr);
 }
@@ -1525,7 +1534,7 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
   uint64_t cookie = linger_op->get_cookie();
   // Shared pointer to avoid a potential race condition
   linger_op->user_data.emplace<std::shared_ptr<Notifier>>(
-    std::make_shared<Notifier>(get_executor(), queue_size));
+    std::make_shared<Notifier>(get_executor(), linger_op, queue_size));
   auto& n = ceph::any_cast<std::shared_ptr<Notifier>&>(
     linger_op->user_data);
   linger_op->handle = std::ref(*n);
@@ -1537,7 +1546,12 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c,
     linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
     asio::bind_executor(
       std::move(e),
-      [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+      [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable {
+       if (e) {
+         linger_op->user_data.reset();
+         linger_op->objecter->linger_cancel(linger_op);
+         cookie = 0;
+       }
        asio::dispatch(asio::append(std::move(c), e, cookie));
       }), nullptr);
 }
@@ -1610,9 +1624,7 @@ void RADOS::unwatch_(uint64_t cookie, IOContext _ioc,
                           [objecter = impl->objecter,
                            linger_op, c = std::move(c)]
                           (bs::error_code ec) mutable {
-                            if (!ec) {
-                              objecter->linger_cancel(linger_op);
-                            }
+                            objecter->linger_cancel(linger_op);
                             asio::dispatch(asio::append(std::move(c), ec));
                           }));
 }
index 7a97aab8756c9eaad2fe49a906a7f134bc8a8087..91413b762fe8c5fe1627e1d97e42a31c51bc4ab9 100644 (file)
@@ -729,7 +729,7 @@ void Objecter::_send_linger_ping(LingerOp *info)
 
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
                 std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
-                fu2::unique_function<Op::OpSig>{CB_Linger_Ping(this, info, now)},
+                CB_Linger_Ping(this, info, now),
                 nullptr, nullptr);
   o->target = info->target;
   o->should_resend = false;
@@ -757,7 +757,7 @@ void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono
       ec = _normalize_watch_error(ec);
       info->last_error = ec;
       if (info->handle) {
-       asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+       asio::post(finish_strand, CB_DoWatchError(this, info, ec));
       }
     }
   } else {
index 36b6f29cb916461a48c3ba39aac93c5c2ab4b6fe..5a0208484189bc807381df2ac6c6c87681935179 100644 (file)
@@ -2063,7 +2063,7 @@ public:
     }
 
     Op(const object_t& o, const object_locator_t& ol,  osdc_opvec&& _ops,
-       int f, OpComp&& fin, version_t *ov, int *offset = nullptr,
+       int f, OpComp fin, version_t *ov, int *offset = nullptr,
        ZTracer::Trace *parent_trace = nullptr) :
       target(o, ol, f),
       ops(std::move(_ops)),
@@ -2103,26 +2103,6 @@ public:
       }
     }
 
-    Op(const object_t& o, const object_locator_t& ol, osdc_opvec&&  _ops,
-       int f, fu2::unique_function<OpSig>&& fin, version_t *ov, int *offset = nullptr,
-       ZTracer::Trace *parent_trace = nullptr) :
-      target(o, ol, f),
-      ops(std::move(_ops)),
-      out_bl(ops.size(), nullptr),
-      out_handler(ops.size()),
-      out_rval(ops.size(), nullptr),
-      out_ec(ops.size(), nullptr),
-      onfinish(std::move(fin)),
-      objver(ov),
-      data_offset(offset) {
-      if (target.base_oloc.key == o)
-       target.base_oloc.key.clear();
-      if (parent_trace && parent_trace->valid()) {
-        trace.init("op", nullptr, parent_trace);
-        trace.event("start");
-      }
-    }
-
     bool operator<(const Op& other) const {
       return tid < other.tid;
     }