From b0b3c099623282016d7229625060c6be58606c3c Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Thu, 8 Sep 2022 23:12:45 +0000 Subject: [PATCH] crimson/net: Message::conn needs to be a foreign_ptr There are two main consequences of this: 1. Messages can't be default copy constructed in crimson. MMonCommand seems to be the only user, and we simply add a copy constructor that duplicates data portions of the message. 2. We can't casually copy-construct the conn into other structures. The main user here is watch/notify. We use copy() explicitely to populate the object_context structures and avoid passing ConnectionFRef by value. Signed-off-by: Samuel Just --- src/crimson/mon/MonClient.cc | 6 +- src/crimson/mon/MonClient.h | 4 +- src/crimson/net/Fwd.h | 1 + src/crimson/osd/ops_executer.cc | 145 ++++++++++++---------- src/crimson/osd/ops_executer.h | 4 +- src/crimson/osd/pg.h | 6 +- src/crimson/osd/recovery_backend.cc | 28 ++--- src/crimson/osd/watch.cc | 23 ++-- src/crimson/osd/watch.h | 12 +- src/messages/MForward.h | 2 +- src/messages/MMonCommand.h | 10 +- src/msg/Message.h | 6 +- src/test/crimson/test_messenger_thrash.cc | 18 +-- 13 files changed, 144 insertions(+), 121 deletions(-) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 5eca1e0736c..edb5f4436bb 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -411,8 +411,8 @@ crimson::net::ConnectionRef Connection::get_conn() { return conn; } -Client::mon_command_t::mon_command_t(ceph::ref_t req) - : req(req) +Client::mon_command_t::mon_command_t(MURef req) + : req(std::move(req)) {} Client::Client(crimson::net::Messenger& messenger, @@ -1075,7 +1075,7 @@ Client::run_command(std::string&& cmd, m->set_tid(tid); m->cmd = {std::move(cmd)}; m->set_data(std::move(bl)); - auto& command = mon_commands.emplace_back(ceph::make_message(*m)); + auto& command = mon_commands.emplace_back(crimson::make_message(*m)); return send_message(std::move(m)).then([&result=command.result] { return result.get_future(); }); diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index b2198f3470d..ac50f821392 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -83,9 +83,9 @@ class Client : public crimson::net::Dispatcher, using command_result_t = seastar::future>; struct mon_command_t { - ceph::ref_t req; + MURef req; typename command_result_t::promise_type result; - mon_command_t(ceph::ref_t req); + mon_command_t(MURef req); }; std::vector mon_commands; diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index e10120571f3..c4719a3a4cd 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -38,6 +38,7 @@ using stop_t = seastar::stop_iteration; class Connection; using ConnectionRef = seastar::shared_ptr; +using ConnectionFRef = seastar::foreign_ptr; class Dispatcher; class ChainedDispatchers; diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 60f5da53026..fdf9d3837d8 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -158,41 +158,48 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( logger().debug("{}", __func__); struct connect_ctx_t { ObjectContext::watch_key_t key; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionFRef conn; watch_info_t info; - connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg) + connect_ctx_t( + const OSDOp& osd_op, + const ExecutableMessage& msg, + crimson::net::ConnectionFRef conn) : key(osd_op.op.watch.cookie, msg.get_reqid().name), - conn(msg.get_connection()), + conn(std::move(conn)), info(create_watch_info(osd_op, msg)) { } }; - return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() }, - [&] (auto& ctx) { - const auto& entity = ctx.key.second; - auto [it, emplaced] = - os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); - if (emplaced) { - logger().info("registered new watch {} by {}", it->second, entity); - txn.nop(); - } else { - logger().info("found existing watch {} by {}", it->second, entity); - } - return seastar::now(); - }, - [] (auto&& ctx, ObjectContextRef obc, Ref pg) { - assert(pg); - auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); - if (emplaced) { - const auto& [cookie, entity] = ctx.key; - it->second = crimson::osd::Watch::create( - obc, ctx.info, entity, std::move(pg)); - logger().info("op_effect: added new watcher: {}", ctx.key); - } else { - logger().info("op_effect: found existing watcher: {}", ctx.key); - } - return it->second->connect(std::move(ctx.conn), true /* will_ping */); - }); + return get_message().get_connection().copy( + ).then([&, this](auto &&conn) { + return with_effect_on_obc( + connect_ctx_t{ osd_op, get_message(), std::move(conn) }, + [&] (auto& ctx) { + const auto& entity = ctx.key.second; + auto [it, emplaced] = + os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); + if (emplaced) { + logger().info("registered new watch {} by {}", it->second, entity); + txn.nop(); + } else { + logger().info("found existing watch {} by {}", it->second, entity); + } + return seastar::now(); + }, + [] (auto&& ctx, ObjectContextRef obc, Ref pg) { + assert(pg); + auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); + if (emplaced) { + const auto& [cookie, entity] = ctx.key; + it->second = crimson::osd::Watch::create( + obc, ctx.info, entity, std::move(pg)); + logger().info("op_effect: added new watcher: {}", ctx.key); + } else { + logger().info("op_effect: found existing watcher: {}", ctx.key); + } + return it->second->connect(std::move(ctx.conn), true /* will_ping */); + }); + }); } OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect( @@ -315,52 +322,56 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify( return crimson::ct_error::enoent::make(); } struct notify_ctx_t { - crimson::net::ConnectionRef conn; + crimson::net::ConnectionFRef conn; notify_info_t ninfo; const uint64_t client_gid; const epoch_t epoch; - notify_ctx_t(const ExecutableMessage& msg) - : conn(msg.get_connection()), + notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn) + : conn(std::move(conn)), client_gid(msg.get_reqid().name.num()), epoch(msg.get_map_epoch()) { } }; - return with_effect_on_obc(notify_ctx_t{ get_message() }, - [&] (auto& ctx) { - try { - auto bp = osd_op.indata.cbegin(); - uint32_t ver; // obsolete - ceph::decode(ver, bp); - ceph::decode(ctx.ninfo.timeout, bp); - ceph::decode(ctx.ninfo.bl, bp); - } catch (const buffer::error&) { - ctx.ninfo.timeout = 0; - } - if (!ctx.ninfo.timeout) { - using crimson::common::local_conf; - ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; - } - ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); - ctx.ninfo.cookie = osd_op.op.notify.cookie; - // return our unique notify id to the client - ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); - return seastar::now(); - }, - [] (auto&& ctx, ObjectContextRef obc, Ref) { - auto alive_watchers = obc->watchers | boost::adaptors::map_values - | boost::adaptors::filtered( - [] (const auto& w) { - // FIXME: filter as for the `is_ping` in `Watch::start_notify` - return w->is_alive(); - }); - return crimson::osd::Notify::create_n_propagate( - std::begin(alive_watchers), - std::end(alive_watchers), - std::move(ctx.conn), - ctx.ninfo, - ctx.client_gid, - obc->obs.oi.user_version); + return get_message().get_connection().copy( + ).then([&, this](auto &&conn) { + return with_effect_on_obc( + notify_ctx_t{ get_message(), std::move(conn) }, + [&] (auto& ctx) { + try { + auto bp = osd_op.indata.cbegin(); + uint32_t ver; // obsolete + ceph::decode(ver, bp); + ceph::decode(ctx.ninfo.timeout, bp); + ceph::decode(ctx.ninfo.bl, bp); + } catch (const buffer::error&) { + ctx.ninfo.timeout = 0; + } + if (!ctx.ninfo.timeout) { + using crimson::common::local_conf; + ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; + } + ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); + ctx.ninfo.cookie = osd_op.op.notify.cookie; + // return our unique notify id to the client + ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); + return seastar::now(); + }, + [] (auto&& ctx, ObjectContextRef obc, Ref) { + auto alive_watchers = obc->watchers | boost::adaptors::map_values + | boost::adaptors::filtered( + [] (const auto& w) { + // FIXME: filter as for the `is_ping` in `Watch::start_notify` + return w->is_alive(); + }); + return crimson::osd::Notify::create_n_propagate( + std::begin(alive_watchers), + std::end(alive_watchers), + std::move(ctx.conn), + ctx.ninfo, + ctx.client_gid, + obc->obs.oi.user_version); + }); }); } diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index b7ea934e322..7f668ba1979 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -94,7 +94,7 @@ public: // with other message types than just the `MOSDOp`. The type erasure // happens in the ctor of `OpsExecuter`. struct ExecutableMessage { - virtual crimson::net::ConnectionRef get_connection() const = 0; + virtual const crimson::net::ConnectionFRef &get_connection() const = 0; virtual osd_reqid_t get_reqid() const = 0; virtual utime_t get_mtime() const = 0; virtual epoch_t get_map_epoch() const = 0; @@ -109,7 +109,7 @@ public: public: ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) { } - crimson::net::ConnectionRef get_connection() const final { + const crimson::net::ConnectionFRef &get_connection() const final { return pimpl->get_connection(); } osd_reqid_t get_reqid() const final { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 174a64183a7..c6a35c0aaa9 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -795,8 +795,8 @@ private: }; struct PG::do_osd_ops_params_t { - crimson::net::ConnectionRef get_connection() const { - return nullptr; + crimson::net::ConnectionFRef &get_connection() const { + return conn; } osd_reqid_t get_reqid() const { return reqid; @@ -817,7 +817,7 @@ struct PG::do_osd_ops_params_t { bool has_flag(uint32_t flag) const { return false; } - crimson::net::ConnectionRef conn; + crimson::net::ConnectionFRef &conn; osd_reqid_t reqid; utime_t mtime; epoch_t map_epoch; diff --git a/src/crimson/osd/recovery_backend.cc b/src/crimson/osd/recovery_backend.cc index 89d74798806..62b9a5cf12a 100644 --- a/src/crimson/osd/recovery_backend.cc +++ b/src/crimson/osd/recovery_backend.cc @@ -240,20 +240,20 @@ RecoveryBackend::handle_scan_get_digest( std::move(m.begin), crimson::common::local_conf().get_val("osd_backfill_scan_min"), crimson::common::local_conf().get_val("osd_backfill_scan_max") - ).then_interruptible([this, - query_epoch=m.query_epoch, - conn=m.get_connection()] (auto backfill_interval) { - auto reply = crimson::make_message( - MOSDPGScan::OP_SCAN_DIGEST, - pg.get_pg_whoami(), - pg.get_osdmap_epoch(), - query_epoch, - spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard), - backfill_interval.begin, - backfill_interval.end); - encode(backfill_interval.objects, reply->get_data()); - return conn->send(std::move(reply)); - }); + ).then_interruptible( + [this, query_epoch=m.query_epoch, &conn=*(m.get_connection()) + ](auto backfill_interval) { + auto reply = crimson::make_message( + MOSDPGScan::OP_SCAN_DIGEST, + pg.get_pg_whoami(), + pg.get_osdmap_epoch(), + query_epoch, + spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard), + backfill_interval.begin, + backfill_interval.end); + encode(backfill_interval.objects, reply->get_data()); + return conn.send(std::move(reply)); + }); } RecoveryBackend::interruptible_future<> diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc index 88fe808524c..4257f7114a1 100644 --- a/src/crimson/osd/watch.cc +++ b/src/crimson/osd/watch.cc @@ -47,15 +47,16 @@ const hobject_t& WatchTimeoutRequest::get_target_oid() const PG::do_osd_ops_params_t WatchTimeoutRequest::get_do_osd_ops_params() const { - PG::do_osd_ops_params_t params; - params.conn = watch->conn; - params.reqid.name = watch->entity_name; - // as in the classical's simple_opc_create() - params.mtime = ceph_clock_now(); - params.map_epoch = get_pg().get_osdmap_epoch(); - params.orig_source_inst = { watch->entity_name, watch->winfo.addr }; - //entity_inst_t orig_source_inst; - params.features = 0; + osd_reqid_t reqid; + reqid.name = watch->entity_name; + PG::do_osd_ops_params_t params{ + watch->conn, + reqid, + ceph_clock_now(), + get_pg().get_osdmap_epoch(), + entity_inst_t{ watch->entity_name, watch->winfo.addr }, + 0 + }; logger().debug("{}: params.reqid={}", __func__, params.reqid); return params; } @@ -77,7 +78,7 @@ Watch::~Watch() logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie()); } -seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool) +seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool) { if (this->conn == conn) { logger().debug("conn={} already connected", conn); @@ -210,7 +211,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs) return out; } -Notify::Notify(crimson::net::ConnectionRef conn, +Notify::Notify(crimson::net::ConnectionFRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index f32904cb1ff..7cd76c00f82 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this { struct private_ctag_t{}; std::set> in_progress_notifies; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionFRef conn; crimson::osd::ObjectContextRef obc; watch_info_t winfo; @@ -67,7 +67,7 @@ public: } ~Watch(); - seastar::future<> connect(crimson::net::ConnectionRef, bool); + seastar::future<> connect(crimson::net::ConnectionFRef, bool); bool is_alive() const { return true; } @@ -118,7 +118,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs); class Notify : public seastar::enable_shared_from_this { std::set watchers; const notify_info_t ninfo; - crimson::net::ConnectionRef conn; + crimson::net::ConnectionFRef conn; const uint64_t client_gid; const uint64_t user_version; bool complete{false}; @@ -139,14 +139,14 @@ class Notify : public seastar::enable_shared_from_this { /// Called on Notify timeout void do_notify_timeout(); - Notify(crimson::net::ConnectionRef conn, + Notify(crimson::net::ConnectionFRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); template Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionFRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); @@ -192,7 +192,7 @@ public: template Notify::Notify(WatchIteratorT begin, WatchIteratorT end, - crimson::net::ConnectionRef conn, + crimson::net::ConnectionFRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) diff --git a/src/messages/MForward.h b/src/messages/MForward.h index f47de8fc2fa..b6fef9e5bb1 100644 --- a/src/messages/MForward.h +++ b/src/messages/MForward.h @@ -48,7 +48,7 @@ public: tid(t), client_caps(caps), msg(NULL) { client_type = m->get_source().type(); client_addrs = m->get_source_addrs(); - if (auto con = m->get_connection()) { + if (auto &con = m->get_connection()) { client_socket_addr = con->get_peer_socket_addr(); } con_features = feat; diff --git a/src/messages/MMonCommand.h b/src/messages/MMonCommand.h index 0e9693ad9c2..1f1e6728bc9 100644 --- a/src/messages/MMonCommand.h +++ b/src/messages/MMonCommand.h @@ -38,7 +38,14 @@ public: fsid(f) { } -private: + MMonCommand(const MMonCommand &other) + : PaxosServiceMessage(MSG_MON_COMMAND, 0), + fsid(other.fsid), + cmd(other.cmd) { + set_tid(other.get_tid()); + set_data(other.get_data()); + } + ~MMonCommand() final {} public: @@ -81,6 +88,7 @@ public: decode(fsid, p); decode(cmd, p); } + private: template friend boost::intrusive_ptr ceph::make_message(Args&&... args); diff --git a/src/msg/Message.h b/src/msg/Message.h index 69d95fee2ee..a9c71956b0b 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -247,8 +247,10 @@ class Message : public RefCountedObject { public: #ifdef WITH_SEASTAR using ConnectionRef = crimson::net::ConnectionRef; + using ConnectionFRef = crimson::net::ConnectionFRef; #else using ConnectionRef = ::ConnectionRef; + using ConnectionFRef = ::ConnectionRef; #endif // WITH_SEASTAR protected: @@ -269,7 +271,7 @@ protected: /* time at which message was fully read */ utime_t recv_complete_stamp; - ConnectionRef connection; + ConnectionFRef connection; uint32_t magic = 0; @@ -344,7 +346,7 @@ protected: completion_hook->complete(0); } public: - const ConnectionRef& get_connection() const { return connection; } + const ConnectionFRef& get_connection() const { return connection; } void set_connection(ConnectionRef c) { connection = std::move(c); } diff --git a/src/test/crimson/test_messenger_thrash.cc b/src/test/crimson/test_messenger_thrash.cc index ff75070f712..84dd26b330d 100644 --- a/src/test/crimson/test_messenger_thrash.cc +++ b/src/test/crimson/test_messenger_thrash.cc @@ -93,7 +93,7 @@ class SyntheticWorkload; class SyntheticDispatcher final : public crimson::net::Dispatcher { public: - std::map > conn_sent; + std::map > conn_sent; std::map sent; unsigned index; SyntheticWorkload *workload; @@ -124,9 +124,9 @@ class SyntheticDispatcher final if (sent.count(pl.seq)) { logger().info(" {} conn= {} {}", __func__, m->get_connection(), pl); - ceph_assert(conn_sent[m->get_connection()].front() == pl.seq); + ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq); ceph_assert(pl.data.contents_equal(sent[pl.seq])); - conn_sent[m->get_connection()].pop_front(); + conn_sent[&*m->get_connection()].pop_front(); sent.erase(pl.seq); } @@ -169,7 +169,7 @@ class SyntheticDispatcher final encode(pl, bl); m->set_data(bl); sent[pl.seq] = pl.data; - conn_sent[con].push_back(pl.seq); + conn_sent[&*con].push_back(pl.seq); logger().info("{} conn= {} send i= {}", __func__, con, pl.seq); @@ -181,17 +181,17 @@ class SyntheticDispatcher final } void clear_pending(crimson::net::ConnectionRef con) { - for (std::deque::iterator it = conn_sent[con].begin(); - it != conn_sent[con].end(); ++it) + for (std::deque::iterator it = conn_sent[&*con].begin(); + it != conn_sent[&*con].end(); ++it) sent.erase(*it); - conn_sent.erase(con); + conn_sent.erase(&*con); } void print() { - for (auto && [conn, list] : conn_sent) { + for (auto && [connptr, list] : conn_sent) { if (!list.empty()) { logger().info("{} {} wait {}", __func__, - conn, list.size()); + (void*)connptr, list.size()); } } } -- 2.39.5