From: Adam C. Emerson Date: Wed, 19 Mar 2025 02:47:04 +0000 (-0400) Subject: rgw: Watch/Notify memory leak fix maybe! X-Git-Tag: testing/wip-vshankar-testing-20250407.170244-debug~16^2~2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7ebc1b8b18474a6fbe06b1e2b1cd3360906ca3a2;p=ceph-ci.git rgw: Watch/Notify memory leak fix maybe! Signed-off-by: Adam C. Emerson --- diff --git a/src/neorados/RADOS.cc b/src/neorados/RADOS.cc index c7eae5e93c3..f0b100bf96d 100644 --- a/src/neorados/RADOS.cc +++ b/src/neorados/RADOS.cc @@ -1373,6 +1373,7 @@ class Notifier : public async::service_list_base_hook { }; asio::io_context::executor_type ex; + Objecter::LingerOp* linger_op; // Zero for unbounded. I would not recommend this. const uint32_t capacity; @@ -1383,14 +1384,18 @@ class Notifier : public async::service_list_base_hook { uint64_t next_id = 0; void service_shutdown() { + if (linger_op) { + linger_op->put(); + } std::unique_lock l(m); handlers.clear(); } public: - Notifier(asio::io_context::executor_type ex, uint32_t capacity) - : ex(ex), capacity(capacity), + Notifier(asio::io_context::executor_type ex, Objecter::LingerOp* linger_op, + uint32_t capacity) + : ex(ex), linger_op(linger_op), capacity(capacity), svc(asio::use_service>( asio::query(ex, boost::asio::execution::context))) { // register for service_shutdown() notifications @@ -1507,7 +1512,11 @@ void RADOS::watch_(Object o, IOContext _ioc, linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, asio::bind_executor( std::move(e), - [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { + [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable { + if (e) { + linger_op->objecter->linger_cancel(linger_op); + cookie = 0; + } asio::dispatch(asio::append(std::move(c), e, cookie)); }), nullptr); } @@ -1525,7 +1534,7 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c, uint64_t cookie = linger_op->get_cookie(); // Shared pointer to avoid a potential race condition linger_op->user_data.emplace>( - std::make_shared(get_executor(), queue_size)); + std::make_shared(get_executor(), linger_op, queue_size)); auto& n = ceph::any_cast&>( linger_op->user_data); linger_op->handle = std::ref(*n); @@ -1537,7 +1546,12 @@ void RADOS::watch_(Object o, IOContext _ioc, WatchComp c, linger_op, op, ioc->snapc, ceph::real_clock::now(), bl, asio::bind_executor( std::move(e), - [c = std::move(c), cookie](bs::error_code e, cb::list) mutable { + [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable { + if (e) { + linger_op->user_data.reset(); + linger_op->objecter->linger_cancel(linger_op); + cookie = 0; + } asio::dispatch(asio::append(std::move(c), e, cookie)); }), nullptr); } @@ -1610,9 +1624,7 @@ void RADOS::unwatch_(uint64_t cookie, IOContext _ioc, [objecter = impl->objecter, linger_op, c = std::move(c)] (bs::error_code ec) mutable { - if (!ec) { - objecter->linger_cancel(linger_op); - } + objecter->linger_cancel(linger_op); asio::dispatch(asio::append(std::move(c), ec)); })); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 7a97aab8756..91413b762fe 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -729,7 +729,7 @@ void Objecter::_send_linger_ping(LingerOp *info) Op *o = new Op(info->target.base_oid, info->target.base_oloc, std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ, - fu2::unique_function{CB_Linger_Ping(this, info, now)}, + CB_Linger_Ping(this, info, now), nullptr, nullptr); o->target = info->target; o->should_resend = false; @@ -757,7 +757,7 @@ void Objecter::_linger_ping(LingerOp *info, bs::error_code ec, ceph::coarse_mono ec = _normalize_watch_error(ec); info->last_error = ec; if (info->handle) { - asio::defer(finish_strand, CB_DoWatchError(this, info, ec)); + asio::post(finish_strand, CB_DoWatchError(this, info, ec)); } } } else { diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 36b6f29cb91..5a020848418 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -2063,7 +2063,7 @@ public: } Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, - int f, OpComp&& fin, version_t *ov, int *offset = nullptr, + int f, OpComp fin, version_t *ov, int *offset = nullptr, ZTracer::Trace *parent_trace = nullptr) : target(o, ol, f), ops(std::move(_ops)), @@ -2103,26 +2103,6 @@ public: } } - Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops, - int f, fu2::unique_function&& fin, version_t *ov, int *offset = nullptr, - ZTracer::Trace *parent_trace = nullptr) : - target(o, ol, f), - ops(std::move(_ops)), - out_bl(ops.size(), nullptr), - out_handler(ops.size()), - out_rval(ops.size(), nullptr), - out_ec(ops.size(), nullptr), - onfinish(std::move(fin)), - objver(ov), - data_offset(offset) { - if (target.base_oloc.key == o) - target.base_oloc.key.clear(); - if (parent_trace && parent_trace->valid()) { - trace.init("op", nullptr, parent_trace); - trace.event("start"); - } - } - bool operator<(const Op& other) const { return tid < other.tid; }