unittest_backend_basics and unittest_ecfailover_with_peering fail under
ASan with megabytes of leaks per run, all originating in MockMessenger
and the listeners that feed into it. A representative LSan stack:
Indirect leak of
2342912 byte(s) in 572 object(s) allocated from:
#1 ceph::buffer::raw_combined::alloc_data_n_controlblock
src/common/buffer.cc:114
#7 MOSDPGLease::encode_payload src/messages/MOSDPGLease.h:54
#8 Message::encode src/msg/Message.cc:242
#9 MockMessenger::send_message src/test/osd/MockMessenger.h:159
#10 MockPeeringListener::send_cluster_message src/test/osd/MockPeeringListener.h:193
#11 PeeringState::send_lease src/osd/PeeringState.cc:1305
Three add_ref/consume mismatches conspire to leak the sender Message,
the decoded receiver Message, and its MockConnection:
- MockPGBackendListener::send_message receives a raw `m` with its +1
from `new T(...)` and wraps it in `MessageRef(m)` with the default
add_ref=true, bumping instead of consuming the +1. Switch to
add_ref=false.
- MockPeeringListener::send_cluster_message detaches an intrusive_ptr
into a raw pointer and then hands it to MockMessenger, which bumps
with add_ref=true; the +1 transferred via detach() is never
consumed. Pass `m.get()` so the caller's intrusive_ptr keeps
ownership and releases on scope exit.
- MockMessenger's receiver lambda lets decode_message()'s +1 (the
Ceph convention) fall out of scope, and constructs the
`new MockConnection(from_osd)` ConnectionRef with the default
add_ref=true so the +1 from `new` leaks too. Construct
ConnectionRef with add_ref=false, and wrap the decoded Message in a
`MessageRef{decoded_msg, /*add_ref=*/false}` so the smart pointer
releases the +1 on every exit path.
While at it, switch the handler API from raw `Message*` / `MsgType*`
to `MessageRef` / `boost::intrusive_ptr<MsgType>` so lifetime is
managed by the smart pointer end-to-end. Handlers that need to extend
the Message's lifetime call `m.detach()` to transfer the +1 to the
raw pointer the consumer takes ownership of: e.g. OpRequest stores
`Message*` and put()s in its destructor, so PGBackendTestFixture.cc
now passes `m.detach()` into `create_request<OpRequest, Message*>(...)`
in place of an explicit `intrusive_ptr_add_ref(m)` followed by `m`.
The peering handler in ECPeeringTestFixture.cc does not store the
Message; it just lets the typed intrusive_ptr drop on scope exit.
Signed-off-by: Kefu Chai <k.chai@proxmox.com>
// Register handlers for peering messages (MOSDPeeringOp)
// All peering messages (Query, Notify, Info, Log) use the same handler pattern
// since they all inherit from MOSDPeeringOp and use get_event()
- auto peering_handler = [this](int from_osd, int to_osd, MOSDPeeringOp* op) -> bool {
- // Message is already correctly typed as MOSDPeeringOp*
+ auto peering_handler = [this](int from_osd, int to_osd,
+ boost::intrusive_ptr<MOSDPeeringOp> op) -> bool {
+ // Message is already correctly typed as MOSDPeeringOp. The
+ // intrusive_ptr keeps the message alive across this handler;
+ // releasing happens automatically when `op` goes out of scope.
ceph_assert(op);
-
+
// Get the peering event from the message
PGPeeringEventRef evt_ref(op->get_event());
*/
class MockMessenger {
public:
- using MessageHandler = std::function<bool(int from_osd, int to_osd, Message*)>;
+ using MessageHandler = std::function<bool(int from_osd, int to_osd, MessageRef)>;
using EpochGetter = std::function<epoch_t(int osd)>;
private:
/**
* Register a type-safe handler for a specific message type.
* Uses C++20 concepts to ensure type safety at compile time.
- * The handler receives the correctly-typed message pointer.
+ * The handler receives a typed intrusive_ptr; lifetime is managed by
+ * the smart pointer. If the handler wants to extend the message's
+ * lifetime (e.g. by storing it in an OpRequest that consumes a
+ * refcount), it can call `.detach()` to transfer ownership without
+ * touching the refcount manually.
*
* @tparam MsgType The specific message type (e.g., MOSDECSubOpWrite)
* @param msg_type The message type code (e.g., MSG_OSD_EC_WRITE)
- * @param handler Lambda that accepts (from_osd, to_osd, MsgType*) and returns true if handled
+ * @param handler Lambda that accepts (from_osd, to_osd,
+ * boost::intrusive_ptr<MsgType>) and returns true if handled
*/
template<typename MsgType>
requires std::derived_from<MsgType, Message>
- void register_typed_handler(int msg_type, std::function<bool(int, int, MsgType*)> handler) {
+ void register_typed_handler(
+ int msg_type,
+ std::function<bool(int, int, boost::intrusive_ptr<MsgType>)> handler) {
// Wrap the typed handler in a generic handler that performs the cast
- register_handler(msg_type, [handler](int from_osd, int to_osd, Message* m) -> bool {
- MsgType* typed_msg = dynamic_cast<MsgType*>(m);
- if (!typed_msg) {
- return false; // Wrong type, let other handlers try
- }
- return handler(from_osd, to_osd, typed_msg);
- });
+ register_handler(msg_type,
+ [handler](int from_osd, int to_osd, MessageRef m) -> bool {
+ auto typed = boost::dynamic_pointer_cast<MsgType>(m);
+ if (!typed) {
+ return false; // Wrong type, let other handlers try
+ }
+ return handler(from_osd, to_osd, std::move(typed));
+ });
}
/**
// Decode the message components into a new message object
// This uses the proper decode_message() function that real messengers use,
- // which supports the new footer format with message authentication
+ // which supports the new footer format with message authentication.
+ // ConnectionRef takes the new MockConnection with add_ref=false so the
+ // +1 from `new` is consumed exactly once (decode_message moves the
+ // ConnectionRef into the Message; the Message destructor releases it).
+ Message::ConnectionRef con{new MockConnection(from_osd), /*add_ref=*/false};
Message *decoded_msg = decode_message(g_ceph_context, 0, header, footer,
- mf.front(), mf.middle(), mf.data(), new MockConnection(from_osd));
+ mf.front(), mf.middle(), mf.data(), con);
ceph_assert(decoded_msg);
-
+ // Wrap decode_message's +1 in a MessageRef so the Message's lifetime
+ // is managed automatically: the ref drops when this lambda returns,
+ // which releases the +1. Handlers that want to extend lifetime
+ // receive the MessageRef by value (or a typed intrusive_ptr) and can
+ // .detach() to transfer ownership cleanly.
+ MessageRef decoded{decoded_msg, /*add_ref=*/false};
+
// Try specific handler first, then catch-all handler (-1)
if (!handlers.contains(header.type)) {
std::cerr << "ERROR: No handler registered for message type " << header.type
std::cerr << std::endl;
}
ceph_assert(handlers.contains(header.type));
- ceph_assert(handlers.at(header.type)(from_osd, to_osd, decoded_msg));
+ ceph_assert(handlers.at(header.type)(from_osd, to_osd, decoded));
});
}
// Routes messages through MockMessenger for asynchronous message processing.
void send_message(int to_osd, Message *m) override {
- MessageRef mref(m);
+ // Callers that pass `new T(...)` hand off a +1 refcount; consume it here
+ // with add_ref=false so the original +1 is owned by `mref`/the storage
+ // vectors and is correctly released when those go out of scope.
+ MessageRef mref(m, /*add_ref=*/false);
sent_messages.push_back(mref);
sent_messages_with_dest.push_back({to_osd, mref});
int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) override {
dout(0) << "send_cluster_message to " << osd << " " << m << " epoch " << epoch << dendl;
if (messenger) {
- // Use MockMessenger for EventLoop-based routing with epoch tracking
- messenger->send_message(pg_whoami.osd, osd, m.detach());
+ // Use MockMessenger for EventLoop-based routing with epoch tracking.
+ // Pass m.get() rather than m.detach(): MockMessenger::send_message takes
+ // a borrowed reference (it wraps in MessageRef internally for the
+ // duration of encoding), so the caller-side intrusive_ptr `m` should
+ // continue to own the refcount and release it when this function
+ // returns. Using detach() leaked the refcount.
+ messenger->send_message(pg_whoami.osd, osd, m.get());
} else {
// Fall back to direct message queue for TestPeeringState compatibility
messages[osd].push_back(m);
// Helper lambda to create a typed handler that wraps messages and routes to backends
auto make_backend_handler = [this]<typename MsgType>(int msg_type) {
messenger->register_typed_handler<MsgType>(msg_type,
- [this](int from_osd, int to_osd, MsgType* m) -> bool {
+ [this](int from_osd, int to_osd, boost::intrusive_ptr<MsgType> m) -> bool {
auto it = backends.find(to_osd);
ceph_assert(it != backends.end());
- OpRequestRef op = this->op_tracker->create_request<OpRequest, Message*>(m);
+ // OpRequest stores Message* and put()s in its destructor. Use
+ // m.detach() to transfer the +1 refcount to the raw pointer
+ // OpRequest takes ownership of, so the lifetime balances without
+ // any explicit refcount manipulation.
+ OpRequestRef op =
+ this->op_tracker->create_request<OpRequest, Message*>(m.detach());
return it->second->_handle_message(op);
});
};
-
+
// Register typed handlers for all EC message types
make_backend_handler.template operator()<MOSDECSubOpWrite>(MSG_OSD_EC_WRITE);
make_backend_handler.template operator()<MOSDECSubOpWriteReply>(MSG_OSD_EC_WRITE_REPLY);
// Helper lambda to create a typed handler that wraps messages and routes to backends
auto make_backend_handler = [this]<typename MsgType>(int msg_type) {
messenger->register_typed_handler<MsgType>(msg_type,
- [this](int from_osd, int to_osd, MsgType* m) -> bool {
+ [this](int from_osd, int to_osd, boost::intrusive_ptr<MsgType> m) -> bool {
auto it = backends.find(to_osd);
ceph_assert(it != backends.end());
- OpRequestRef op = this->op_tracker->create_request<OpRequest, Message*>(m);
+ // See setup_ec_pool() above: detach to transfer the +1 refcount to
+ // the raw pointer OpRequest will take ownership of.
+ OpRequestRef op =
+ this->op_tracker->create_request<OpRequest, Message*>(m.detach());
return it->second->_handle_message(op);
});
};
-
+
// Register typed handlers for replicated backend message types
make_backend_handler.template operator()<MOSDRepOp>(MSG_OSD_REPOP);
make_backend_handler.template operator()<MOSDRepOpReply>(MSG_OSD_REPOPREPLY);