]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/net: Message::conn needs to be a foreign_ptr
authorSamuel Just <sjust@redhat.com>
Thu, 8 Sep 2022 23:12:45 +0000 (23:12 +0000)
committerSamuel Just <sjust@redhat.com>
Tue, 27 Sep 2022 02:35:41 +0000 (19:35 -0700)
There are two main consequences of this:
1. Messages can't be default copy constructed in crimson.  MMonCommand
   seems to be the only user, and we simply add a copy constructor that
   duplicates data portions of the message.
2. We can't casually copy-construct the conn into other structures.
   The main user here is watch/notify.  We use copy() explicitely
   to populate the object_context structures and avoid passing
   ConnectionFRef by value.

Signed-off-by: Samuel Just <sjust@redhat.com>
13 files changed:
src/crimson/mon/MonClient.cc
src/crimson/mon/MonClient.h
src/crimson/net/Fwd.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/pg.h
src/crimson/osd/recovery_backend.cc
src/crimson/osd/watch.cc
src/crimson/osd/watch.h
src/messages/MForward.h
src/messages/MMonCommand.h
src/msg/Message.h
src/test/crimson/test_messenger_thrash.cc

index 5eca1e0736cabd9f9c31a3aae56918a5fe13ec95..edb5f4436bbaba566ab54c49fe9da069629d3a19 100644 (file)
@@ -411,8 +411,8 @@ crimson::net::ConnectionRef Connection::get_conn() {
   return conn;
 }
 
-Client::mon_command_t::mon_command_t(ceph::ref_t<MMonCommand> req)
-  : req(req)
+Client::mon_command_t::mon_command_t(MURef<MMonCommand> req)
+  : req(std::move(req))
 {}
 
 Client::Client(crimson::net::Messenger& messenger,
@@ -1075,7 +1075,7 @@ Client::run_command(std::string&& cmd,
   m->set_tid(tid);
   m->cmd = {std::move(cmd)};
   m->set_data(std::move(bl));
-  auto& command = mon_commands.emplace_back(ceph::make_message<MMonCommand>(*m));
+  auto& command = mon_commands.emplace_back(crimson::make_message<MMonCommand>(*m));
   return send_message(std::move(m)).then([&result=command.result] {
     return result.get_future();
   });
index b2198f3470d464d5a0334a1457429d231254d5c7..ac50f821392dc60d85e493c7f18c13a72ea4f48b 100644 (file)
@@ -83,9 +83,9 @@ class Client : public crimson::net::Dispatcher,
   using command_result_t =
     seastar::future<std::tuple<std::int32_t, std::string, ceph::bufferlist>>;
   struct mon_command_t {
-    ceph::ref_t<MMonCommand> req;
+    MURef<MMonCommand> req;
     typename command_result_t::promise_type result;
-    mon_command_t(ceph::ref_t<MMonCommand> req);
+    mon_command_t(MURef<MMonCommand> req);
   };
   std::vector<mon_command_t> mon_commands;
 
index e10120571f3353fc8fe05403377c387152db13e5..c4719a3a4cd53721cb09d899aebe25eea9272315 100644 (file)
@@ -38,6 +38,7 @@ using stop_t = seastar::stop_iteration;
 
 class Connection;
 using ConnectionRef = seastar::shared_ptr<Connection>;
+using ConnectionFRef = seastar::foreign_ptr<ConnectionRef>;
 
 class Dispatcher;
 class ChainedDispatchers;
index 60f5da530260382a64514400af1c3a1bcc6fc43d..fdf9d3837d88cdd25129d19a4e230224ea56ca9d 100644 (file)
@@ -158,41 +158,48 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
   logger().debug("{}", __func__);
   struct connect_ctx_t {
     ObjectContext::watch_key_t key;
-    crimson::net::ConnectionRef conn;
+    crimson::net::ConnectionFRef conn;
     watch_info_t info;
 
-    connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
+    connect_ctx_t(
+      const OSDOp& osd_op,
+      const ExecutableMessage& msg,
+      crimson::net::ConnectionFRef conn)
       : key(osd_op.op.watch.cookie, msg.get_reqid().name),
-        conn(msg.get_connection()),
+        conn(std::move(conn)),
         info(create_watch_info(osd_op, msg)) {
     }
   };
-  return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
-    [&] (auto& ctx) {
-      const auto& entity = ctx.key.second;
-      auto [it, emplaced] =
-        os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
-      if (emplaced) {
-        logger().info("registered new watch {} by {}", it->second, entity);
-        txn.nop();
-      } else {
-        logger().info("found existing watch {} by {}", it->second, entity);
-      }
-      return seastar::now();
-    },
-    [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
-      assert(pg);
-      auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
-      if (emplaced) {
-        const auto& [cookie, entity] = ctx.key;
-        it->second = crimson::osd::Watch::create(
-          obc, ctx.info, entity, std::move(pg));
-        logger().info("op_effect: added new watcher: {}", ctx.key);
-      } else {
-        logger().info("op_effect: found existing watcher: {}", ctx.key);
-      }
-      return it->second->connect(std::move(ctx.conn), true /* will_ping */);
-    });
+  return get_message().get_connection().copy(
+  ).then([&, this](auto &&conn) {
+    return with_effect_on_obc(
+      connect_ctx_t{ osd_op, get_message(), std::move(conn) },
+      [&] (auto& ctx) {
+       const auto& entity = ctx.key.second;
+       auto [it, emplaced] =
+         os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+       if (emplaced) {
+         logger().info("registered new watch {} by {}", it->second, entity);
+         txn.nop();
+       } else {
+         logger().info("found existing watch {} by {}", it->second, entity);
+       }
+       return seastar::now();
+      },
+      [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+       assert(pg);
+       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+       if (emplaced) {
+         const auto& [cookie, entity] = ctx.key;
+         it->second = crimson::osd::Watch::create(
+           obc, ctx.info, entity, std::move(pg));
+         logger().info("op_effect: added new watcher: {}", ctx.key);
+       } else {
+         logger().info("op_effect: found existing watcher: {}", ctx.key);
+       }
+       return it->second->connect(std::move(ctx.conn), true /* will_ping */);
+      });
+  });
 }
 
 OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
@@ -315,52 +322,56 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_notify(
     return crimson::ct_error::enoent::make();
   }
   struct notify_ctx_t {
-    crimson::net::ConnectionRef conn;
+    crimson::net::ConnectionFRef conn;
     notify_info_t ninfo;
     const uint64_t client_gid;
     const epoch_t epoch;
 
-    notify_ctx_t(const ExecutableMessage& msg)
-      : conn(msg.get_connection()),
+    notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn)
+      : conn(std::move(conn)),
         client_gid(msg.get_reqid().name.num()),
         epoch(msg.get_map_epoch()) {
     }
   };
-  return with_effect_on_obc(notify_ctx_t{ get_message() },
-    [&] (auto& ctx) {
-      try {
-        auto bp = osd_op.indata.cbegin();
-        uint32_t ver; // obsolete
-        ceph::decode(ver, bp);
-        ceph::decode(ctx.ninfo.timeout, bp);
-        ceph::decode(ctx.ninfo.bl, bp);
-      } catch (const buffer::error&) {
-        ctx.ninfo.timeout = 0;
-      }
-      if (!ctx.ninfo.timeout) {
-        using crimson::common::local_conf;
-        ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
-      }
-      ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
-      ctx.ninfo.cookie = osd_op.op.notify.cookie;
-      // return our unique notify id to the client
-      ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
-      return seastar::now();
-    },
-    [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
-      auto alive_watchers = obc->watchers | boost::adaptors::map_values
-                                          | boost::adaptors::filtered(
-        [] (const auto& w) {
-          // FIXME: filter as for the `is_ping` in `Watch::start_notify`
-          return w->is_alive();
-        });
-      return crimson::osd::Notify::create_n_propagate(
-        std::begin(alive_watchers),
-        std::end(alive_watchers),
-        std::move(ctx.conn),
-        ctx.ninfo,
-        ctx.client_gid,
-        obc->obs.oi.user_version);
+  return get_message().get_connection().copy(
+  ).then([&, this](auto &&conn) {
+    return with_effect_on_obc(
+      notify_ctx_t{ get_message(), std::move(conn) },
+      [&] (auto& ctx) {
+       try {
+         auto bp = osd_op.indata.cbegin();
+         uint32_t ver; // obsolete
+         ceph::decode(ver, bp);
+         ceph::decode(ctx.ninfo.timeout, bp);
+         ceph::decode(ctx.ninfo.bl, bp);
+       } catch (const buffer::error&) {
+         ctx.ninfo.timeout = 0;
+       }
+       if (!ctx.ninfo.timeout) {
+         using crimson::common::local_conf;
+         ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
+       }
+       ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+       ctx.ninfo.cookie = osd_op.op.notify.cookie;
+       // return our unique notify id to the client
+       ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
+       return seastar::now();
+      },
+      [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+       auto alive_watchers = obc->watchers | boost::adaptors::map_values
+         | boost::adaptors::filtered(
+           [] (const auto& w) {
+             // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+             return w->is_alive();
+           });
+       return crimson::osd::Notify::create_n_propagate(
+         std::begin(alive_watchers),
+         std::end(alive_watchers),
+         std::move(ctx.conn),
+         ctx.ninfo,
+         ctx.client_gid,
+         obc->obs.oi.user_version);
+      });
   });
 }
 
index b7ea934e3228b420a10878ca8ead24d66648476f..7f668ba19795f6e16a1117be41c2b321b22d9ea1 100644 (file)
@@ -94,7 +94,7 @@ public:
   // with other message types than just the `MOSDOp`. The type erasure
   // happens in the ctor of `OpsExecuter`.
   struct ExecutableMessage {
-    virtual crimson::net::ConnectionRef get_connection() const = 0;
+    virtual const crimson::net::ConnectionFRef &get_connection() const = 0;
     virtual osd_reqid_t get_reqid() const = 0;
     virtual utime_t get_mtime() const = 0;
     virtual epoch_t get_map_epoch() const = 0;
@@ -109,7 +109,7 @@ public:
   public:
     ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) {
     }
-    crimson::net::ConnectionRef get_connection() const final {
+    const crimson::net::ConnectionFRef &get_connection() const final {
       return pimpl->get_connection();
     }
     osd_reqid_t get_reqid() const final {
index 174a64183a7a88cc07179ff6bd2f2a01cf11ef97..c6a35c0aaa96510b7492d06823e6bd5c6ce987cb 100644 (file)
@@ -795,8 +795,8 @@ private:
 };
 
 struct PG::do_osd_ops_params_t {
-  crimson::net::ConnectionRef get_connection() const {
-    return nullptr;
+  crimson::net::ConnectionFRef &get_connection() const {
+    return conn;
   }
   osd_reqid_t get_reqid() const {
     return reqid;
@@ -817,7 +817,7 @@ struct PG::do_osd_ops_params_t {
   bool has_flag(uint32_t flag) const {
     return false;
  }
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionFRef &conn;
   osd_reqid_t reqid;
   utime_t mtime;
   epoch_t map_epoch;
index 89d74798806bb00d35de3d9e86e25dd5e4f7214b..62b9a5cf12a8b9434092a54ca60d488783fdd955 100644 (file)
@@ -240,20 +240,20 @@ RecoveryBackend::handle_scan_get_digest(
     std::move(m.begin),
     crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
     crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
-  ).then_interruptible([this,
-          query_epoch=m.query_epoch,
-          conn=m.get_connection()] (auto backfill_interval) {
-    auto reply = crimson::make_message<MOSDPGScan>(
-      MOSDPGScan::OP_SCAN_DIGEST,
-      pg.get_pg_whoami(),
-      pg.get_osdmap_epoch(),
-      query_epoch,
-      spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
-      backfill_interval.begin,
-      backfill_interval.end);
-    encode(backfill_interval.objects, reply->get_data());
-    return conn->send(std::move(reply));
-  });
+  ).then_interruptible(
+    [this, query_epoch=m.query_epoch, &conn=*(m.get_connection())
+    ](auto backfill_interval) {
+      auto reply = crimson::make_message<MOSDPGScan>(
+       MOSDPGScan::OP_SCAN_DIGEST,
+       pg.get_pg_whoami(),
+       pg.get_osdmap_epoch(),
+       query_epoch,
+       spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
+       backfill_interval.begin,
+       backfill_interval.end);
+      encode(backfill_interval.objects, reply->get_data());
+      return conn.send(std::move(reply));
+    });
 }
 
 RecoveryBackend::interruptible_future<>
index 88fe808524cc42f59b53be6f6effb7435577f5b2..4257f7114a150caa61b1d746f47c20a780d06225 100644 (file)
@@ -47,15 +47,16 @@ const hobject_t& WatchTimeoutRequest::get_target_oid() const
 PG::do_osd_ops_params_t
 WatchTimeoutRequest::get_do_osd_ops_params() const
 {
-  PG::do_osd_ops_params_t params;
-  params.conn = watch->conn;
-  params.reqid.name = watch->entity_name;
-  // as in the classical's simple_opc_create()
-  params.mtime = ceph_clock_now();
-  params.map_epoch = get_pg().get_osdmap_epoch();
-  params.orig_source_inst = { watch->entity_name, watch->winfo.addr };
-  //entity_inst_t orig_source_inst;
-  params.features = 0;
+  osd_reqid_t reqid;
+  reqid.name = watch->entity_name;
+  PG::do_osd_ops_params_t params{
+    watch->conn,
+    reqid,
+    ceph_clock_now(),
+    get_pg().get_osdmap_epoch(),
+    entity_inst_t{ watch->entity_name, watch->winfo.addr },
+    0
+  };
   logger().debug("{}: params.reqid={}", __func__, params.reqid);
   return params;
 }
@@ -77,7 +78,7 @@ Watch::~Watch()
   logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
 }
 
-seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool)
 {
   if (this->conn == conn) {
     logger().debug("conn={} already connected", conn);
@@ -210,7 +211,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
   return out;
 }
 
-Notify::Notify(crimson::net::ConnectionRef conn,
+Notify::Notify(crimson::net::ConnectionFRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
index f32904cb1ffc72b2ac33bbc1c7f7d62e1cd77c57..7cd76c00f827f1d917ceccfe37bd0948861a4e60 100644 (file)
@@ -34,7 +34,7 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   struct private_ctag_t{};
 
   std::set<NotifyRef, std::less<>> in_progress_notifies;
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionFRef conn;
   crimson::osd::ObjectContextRef obc;
 
   watch_info_t winfo;
@@ -67,7 +67,7 @@ public:
   }
   ~Watch();
 
-  seastar::future<> connect(crimson::net::ConnectionRef, bool);
+  seastar::future<> connect(crimson::net::ConnectionFRef, bool);
   bool is_alive() const {
     return true;
   }
@@ -118,7 +118,7 @@ std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs);
 class Notify : public seastar::enable_shared_from_this<Notify> {
   std::set<WatchRef> watchers;
   const notify_info_t ninfo;
-  crimson::net::ConnectionRef conn;
+  crimson::net::ConnectionFRef conn;
   const uint64_t client_gid;
   const uint64_t user_version;
   bool complete{false};
@@ -139,14 +139,14 @@ class Notify : public seastar::enable_shared_from_this<Notify> {
   /// Called on Notify timeout
   void do_notify_timeout();
 
-  Notify(crimson::net::ConnectionRef conn,
+  Notify(crimson::net::ConnectionFRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
   template <class WatchIteratorT>
   Notify(WatchIteratorT begin,
          WatchIteratorT end,
-         crimson::net::ConnectionRef conn,
+         crimson::net::ConnectionFRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
@@ -192,7 +192,7 @@ public:
 template <class WatchIteratorT>
 Notify::Notify(WatchIteratorT begin,
                WatchIteratorT end,
-               crimson::net::ConnectionRef conn,
+               crimson::net::ConnectionFRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
index f47de8fc2fa825545a9f01338ffb5ef13115e8cc..b6fef9e5bb1528efb2042d50bd1a134cbee3ea0e 100644 (file)
@@ -48,7 +48,7 @@ public:
     tid(t), client_caps(caps), msg(NULL) {
     client_type = m->get_source().type();
     client_addrs = m->get_source_addrs();
-    if (auto con = m->get_connection()) {
+    if (auto &con = m->get_connection()) {
       client_socket_addr = con->get_peer_socket_addr();
     }
     con_features = feat;
index 0e9693ad9c24f59d993a99a189b58e8b84924898..1f1e6728bc9b086332c171c9e9759d9a59da9bbe 100644 (file)
@@ -38,7 +38,14 @@ public:
       fsid(f)
   { }
 
-private:
+  MMonCommand(const MMonCommand &other)
+    : PaxosServiceMessage(MSG_MON_COMMAND, 0),
+      fsid(other.fsid),
+      cmd(other.cmd) {
+    set_tid(other.get_tid());
+    set_data(other.get_data());
+  }
+
   ~MMonCommand() final {}
 
 public:
@@ -81,6 +88,7 @@ public:
     decode(fsid, p);
     decode(cmd, p);
   }
+
 private:
   template<class T, typename... Args>
   friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
index 69d95fee2ee048c8185be6594d9dbac46a1208bb..a9c71956b0b000b56a1a282fac3757b7465401df 100644 (file)
@@ -247,8 +247,10 @@ class Message : public RefCountedObject {
 public:
 #ifdef WITH_SEASTAR
   using ConnectionRef = crimson::net::ConnectionRef;
+  using ConnectionFRef = crimson::net::ConnectionFRef;
 #else
   using ConnectionRef = ::ConnectionRef;
+  using ConnectionFRef = ::ConnectionRef;
 #endif // WITH_SEASTAR
 
 protected:
@@ -269,7 +271,7 @@ protected:
   /* time at which message was fully read */
   utime_t recv_complete_stamp;
 
-  ConnectionRef connection;
+  ConnectionFRef connection;
 
   uint32_t magic = 0;
 
@@ -344,7 +346,7 @@ protected:
       completion_hook->complete(0);
   }
 public:
-  const ConnectionRef& get_connection() const { return connection; }
+  const ConnectionFRef& get_connection() const { return connection; }
   void set_connection(ConnectionRef c) {
     connection = std::move(c);
   }
index ff75070f71256b86e6f3d3ba5ac67e3657029e5e..84dd26b330db656f0edb98b30dd2842c400fe2f3 100644 (file)
@@ -93,7 +93,7 @@ class SyntheticWorkload;
 class SyntheticDispatcher final
     : public crimson::net::Dispatcher {
   public:
-  std::map<crimson::net::ConnectionRef, std::deque<payload_seq_t> > conn_sent;
+  std::map<crimson::net::Connection*, std::deque<payload_seq_t> > conn_sent;
   std::map<payload_seq_t, bufferlist> sent;
   unsigned index;
   SyntheticWorkload *workload;
@@ -124,9 +124,9 @@ class SyntheticDispatcher final
       if (sent.count(pl.seq)) {
         logger().info(" {} conn= {} {}", __func__,
           m->get_connection(), pl);
-        ceph_assert(conn_sent[m->get_connection()].front() == pl.seq);
+        ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq);
         ceph_assert(pl.data.contents_equal(sent[pl.seq]));
-        conn_sent[m->get_connection()].pop_front();
+        conn_sent[&*m->get_connection()].pop_front();
         sent.erase(pl.seq);
       }
 
@@ -169,7 +169,7 @@ class SyntheticDispatcher final
     encode(pl, bl);
     m->set_data(bl);
     sent[pl.seq] = pl.data;
-    conn_sent[con].push_back(pl.seq);
+    conn_sent[&*con].push_back(pl.seq);
     logger().info("{} conn= {} send i= {}",
       __func__, con, pl.seq);
 
@@ -181,17 +181,17 @@ class SyntheticDispatcher final
   }
 
   void clear_pending(crimson::net::ConnectionRef con) {
-    for (std::deque<uint64_t>::iterator it = conn_sent[con].begin();
-         it != conn_sent[con].end(); ++it)
+    for (std::deque<uint64_t>::iterator it = conn_sent[&*con].begin();
+         it != conn_sent[&*con].end(); ++it)
       sent.erase(*it);
-    conn_sent.erase(con);
+    conn_sent.erase(&*con);
   }
 
   void print() {
-    for (auto && [conn, list] : conn_sent) {
+    for (auto && [connptr, list] : conn_sent) {
       if (!list.empty()) {
         logger().info("{} {} wait {}", __func__,
-                      conn, list.size());
+                      (void*)connptr, list.size());
       }
     }
   }