]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/net: Use MessageURef in messenger internals
authorAmnon Hanuhov <ahanukov@redhat.com>
Thu, 3 Jun 2021 11:47:00 +0000 (14:47 +0300)
committerAmnon Hanuhov <ahanukov@redhat.com>
Sat, 19 Jun 2021 14:03:08 +0000 (17:03 +0300)
Signed-off-by: Amnon Hanuhov <ahanukov@redhat.com>
16 files changed:
src/crimson/mgr/client.cc
src/crimson/mgr/client.h
src/crimson/net/Connection.h
src/crimson/net/Protocol.cc
src/crimson/net/Protocol.h
src/crimson/net/ProtocolV2.cc
src/crimson/net/ProtocolV2.h
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/osd/osd.cc
src/crimson/osd/osd.h
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/pg.cc
src/crimson/osd/pg.h
src/crimson/osd/pg_recovery.cc
src/crimson/osd/pg_recovery.h

index db888fe470f886b88b66734a626f087931c760a7..5d259c29e13c01b83b3a3f532aef3bc377287a87 100644 (file)
@@ -70,7 +70,7 @@ void Client::ms_handle_connect(crimson::net::ConnectionRef c)
   gate.dispatch_in_background(__func__, *this, [this, c] {
     if (conn == c) {
       // ask for the mgrconfigure message
-      auto m = ceph::make_message<MMgrOpen>();
+      auto m = crimson::make_message<MMgrOpen>();
       m->daemon_name = local_conf()->name.get_id();
       return conn->send(std::move(m));
     } else {
index ad7e1fde54e8e0a52c43ebea26277dddeb374c9e..17d62d03477128d3fb3bc82956868eaeb17b4e0c 100644 (file)
@@ -24,7 +24,7 @@ namespace crimson::mgr
 // implement WithStats if you want to report stats to mgr periodically
 class WithStats {
 public:
-  virtual MessageRef get_stats() const = 0;
+  virtual MessageURef get_stats() const = 0;
   virtual ~WithStats() {}
 };
 
index 4e10dded230e54f768ef1732d38bdb7b7d2ea692..a494a6baacbc0f146033bc71b1bdd2ebee077443 100644 (file)
@@ -127,8 +127,6 @@ class Connection : public seastar::enable_shared_from_this<Connection> {
 
   /// send a message over a connection that has completed its handshake
   virtual seastar::future<> send(MessageURef msg) = 0;
-  // The version with MessageRef will be dropped in the future
-  virtual seastar::future<> send(MessageRef msg) = 0;
 
   /// send a keepalive message over a connection that has completed its
   /// handshake
index 804a80923d6480dce5beac791fbad747a95ad370..30ee539d2567cd75462f52812f7eef3a0bd2c9ff 100644 (file)
@@ -110,7 +110,7 @@ ceph::bufferlist Protocol::sweep_messages_and_move_to_sent(
   return bl;
 }
 
-seastar::future<> Protocol::send(MessageRef msg)
+seastar::future<> Protocol::send(MessageURef msg)
 {
   if (write_state != write_state_t::drop) {
     conn.out_q.push_back(std::move(msg));
@@ -153,7 +153,7 @@ void Protocol::requeue_sent()
   conn.out_seq -= conn.sent.size();
   logger().debug("{} requeue {} items, revert out_seq to {}",
                  conn, conn.sent.size(), conn.out_seq);
-  for (MessageRef& msg : conn.sent) {
+  for (MessageURef& msg : conn.sent) {
     msg->clear_payload();
     msg->set_seq(0);
   }
@@ -204,7 +204,7 @@ void Protocol::ack_writes(seq_num_t seq)
   }
   while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
     logger().trace("{} got ack seq {} >= {}, pop {}",
-                   conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+                   conn, seq, conn.sent.front()->get_seq(), *conn.sent.front());
     conn.sent.pop_front();
   }
 }
index ce88629ba6c244e2174f037db145e7ee2a81d417..0343f026025aa9a7fcb56180e651f1f8513abf59 100644 (file)
@@ -56,7 +56,7 @@ class Protocol {
   virtual void trigger_close() = 0;
 
   virtual ceph::bufferlist do_sweep_messages(
-      const std::deque<MessageRef>& msgs,
+      const std::deque<MessageURef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
       std::optional<utime_t> keepalive_ack,
@@ -90,7 +90,7 @@ class Protocol {
 
 // the write state-machine
  public:
-  seastar::future<> send(MessageRef msg);
+  seastar::future<> send(MessageURef msg);
   seastar::future<> keepalive();
 
 // TODO: encapsulate a SessionedSender class
index 15376fe8a14208e689fd3ca91c0de35400f1e2bf..370085a43a123e30429917926d699369886a2737 100644 (file)
@@ -1792,7 +1792,7 @@ void ProtocolV2::trigger_replacing(bool reconnect,
 // READY state
 
 ceph::bufferlist ProtocolV2::do_sweep_messages(
-    const std::deque<MessageRef>& msgs,
+    const std::deque<MessageURef>& msgs,
     size_t num_msgs,
     bool require_keepalive,
     std::optional<utime_t> _keepalive_ack,
@@ -1818,7 +1818,7 @@ ceph::bufferlist ProtocolV2::do_sweep_messages(
     INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
   }
 
-  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+  std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
     // TODO: move to common code
     // set priority
     msg->get_header().src = messenger.get_myname();
index be9a2281668757ff6149399242ba628b393328a1..319802690cc3ca698a18734926b6ac35aa81c8b0 100644 (file)
@@ -31,7 +31,7 @@ class ProtocolV2 final : public Protocol {
   void trigger_close() override;
 
   ceph::bufferlist do_sweep_messages(
-      const std::deque<MessageRef>& msgs,
+      const std::deque<MessageURef>& msgs,
       size_t num_msgs,
       bool require_keepalive,
       std::optional<utime_t> keepalive_ack,
index 630dbdfa8c8ae7da6fbe12dcf776d3f1eb4c0541..a119b9a96c673e12b3c1daccc46eb13961de3965 100644 (file)
@@ -70,12 +70,6 @@ bool SocketConnection::peer_wins() const
 }
 
 seastar::future<> SocketConnection::send(MessageURef msg)
-{
-  assert(seastar::this_shard_id() == shard_id());
-  return protocol->send(MessageRef{msg.release(), false});
-}
-
-seastar::future<> SocketConnection::send(MessageRef msg)
 {
   assert(seastar::this_shard_id() == shard_id());
   return protocol->send(std::move(msg));
index 068d8886ac4f4b1d82d726beefde307edb1deeb6..e2bdc24853d4895ec625c425710bc5d10552be6b 100644 (file)
@@ -44,9 +44,9 @@ class SocketConnection : public Connection {
   bool update_rx_seq(seq_num_t seq);
 
   // messages to be resent after connection gets reset
-  std::deque<MessageRef> out_q;
+  std::deque<MessageURef> out_q;
   // messages sent, but not yet acked by peer
-  std::deque<MessageRef> sent;
+  std::deque<MessageURef> sent;
 
   seastar::shard_id shard_id() const;
 
@@ -70,7 +70,6 @@ class SocketConnection : public Connection {
 #endif
 
   seastar::future<> send(MessageURef msg) override;
-  seastar::future<> send(MessageRef msg) override;
 
   seastar::future<> keepalive() override;
 
index 539e3a65b6737e3e21d25b442a8aa2e3cd1a546d..65cf8cc459d96971358140fe21e040516da0a59a 100644 (file)
@@ -760,11 +760,11 @@ void OSD::update_stats()
   });
 }
 
-MessageRef OSD::get_stats() const
+MessageURef OSD::get_stats() const
 {
   // todo: m-to-n: collect stats using map-reduce
   // MPGStats::had_map_for is not used since PGMonitor was removed
-  auto m = ceph::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
+  auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
   m->osd_stat = osd_stat;
   for (auto [pgid, pg] : pg_map.get_pgs()) {
     if (pg->is_primary()) {
index ebce16bf8ccf691b18446740ea462399bf44d0bb..59b7cf50c0083668fc5985ddbcd2ae549860b7bb 100644 (file)
@@ -105,7 +105,7 @@ class OSD final : public crimson::net::Dispatcher,
   osd_stat_t osd_stat;
   uint32_t osd_stat_seq = 0;
   void update_stats();
-  MessageRef get_stats() const final;
+  MessageURef get_stats() const final;
 
   // AuthHandler methods
   void handle_authentication(const EntityName& name,
index 93b633afb5ac836b4c83b416c80b0c23d33cd1e1..2244791ffe4cd45deb846039046c12e76990b5c4 100644 (file)
@@ -141,8 +141,8 @@ ClientRequest::process_pg_op(
   Ref<PG> &pg)
 {
   return pg->do_pg_ops(m)
-    .then_interruptible([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
-      return conn->send(reply);
+    .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
+      return conn->send(std::move(reply));
     });
 }
 
@@ -214,10 +214,10 @@ ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
     }).then_interruptible(
       [this, pg, all_completed=std::move(all_completed)]() mutable {
       return all_completed.safe_then_interruptible(
-        [this, pg](Ref<MOSDOpReply> reply) {
+        [this, pg](MURef<MOSDOpReply> reply) {
         return with_blocking_future_interruptible<IOInterruptCondition>(
             handle.enter(pp(*pg).send_reply)).then_interruptible(
-              [this, reply=std::move(reply)] {
+              [this, reply=std::move(reply)]() mutable{
               return conn->send(std::move(reply));
             });
       }, crimson::ct_error::eagain::handle([this, pg]() mutable {
index 066798417a4884dd146283e7f8f7f285b789c632..c7ad0e1360dc1dc108e456de064d307faa265712 100644 (file)
@@ -736,7 +736,7 @@ PG::do_osd_ops_execute(
   }));
 }
 
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ref<MOSDOpReply>>>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
 PG::do_osd_ops(
   Ref<MOSDOp> m,
   ObjectContextRef obc,
@@ -745,7 +745,7 @@ PG::do_osd_ops(
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
   }
-  return do_osd_ops_execute<Ref<MOSDOpReply>>(
+  return do_osd_ops_execute<MURef<MOSDOpReply>>(
     seastar::make_lw_shared<OpsExecuter>(
       std::move(obc), op_info, get_pool().info, get_backend(), *m),
     m->ops,
@@ -757,7 +757,7 @@ PG::do_osd_ops(
       if (result > 0 && !rvec) {
         result = 0;
       }
-      auto reply = ceph::make_message<MOSDOpReply>(m.get(),
+      auto reply = crimson::make_message<MOSDOpReply>(m.get(),
                                              result,
                                              get_osdmap_epoch(),
                                              0,
@@ -767,16 +767,16 @@ PG::do_osd_ops(
         "do_osd_ops: {} - object {} sending reply",
         *m,
         m->get_hobj());
-      return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(
+      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
         std::move(reply));
     },
     [m, this] (const std::error_code& e) {
-      auto reply = ceph::make_message<MOSDOpReply>(
+      auto reply = crimson::make_message<MOSDOpReply>(
         m.get(), -e.value(), get_osdmap_epoch(), 0, false);
       reply->set_enoent_reply_versions(
         peering_state.get_info().last_update,
         peering_state.get_info().last_user_version);
-      return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+      return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
     });
 }
 
@@ -798,7 +798,7 @@ PG::do_osd_ops(
     std::move(failure_func));
 }
 
-PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
 {
   if (__builtin_expect(stopping, false)) {
     throw crimson::common::system_shutdown_exception();
@@ -810,16 +810,16 @@ PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
     logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
     return ox->execute_op(osd_op);
   }).then_interruptible([m, this, ox = std::move(ox)] {
-    auto reply = ceph::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+    auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
                                            CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
                                            false);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
   }).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
-    auto reply = ceph::make_message<MOSDOpReply>(
+    auto reply = crimson::make_message<MOSDOpReply>(
       m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
     reply->set_enoent_reply_versions(peering_state.get_info().last_update,
                                     peering_state.get_info().last_user_version);
-    return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+    return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
   });
 }
 
index d56decf724a7b7e9d2481428015010ef9fa4f106..f8c531f03596042e1181d773ff4783564c349c67 100644 (file)
@@ -571,7 +571,7 @@ private:
   using pg_rep_op_fut_t =
     std::tuple<interruptible_future<>,
                do_osd_ops_iertr::future<Ret>>;
-  do_osd_ops_iertr::future<pg_rep_op_fut_t<Ref<MOSDOpReply>>> do_osd_ops(
+  do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
     Ref<MOSDOp> m,
     ObjectContextRef obc,
     const OpInfo &op_info);
@@ -594,7 +594,7 @@ private:
     const OpInfo &op_info,
     SuccessFunc&& success_func,
     FailureFunc&& failure_func);
-  interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+  interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
   std::tuple<interruptible_future<>, interruptible_future<>>
   submit_transaction(
     const OpInfo& op_info,
index 1b8c865f21f48aa51c268a911612130a3467c9bd..e879cb9ec14c207343b50196e5500dd10dd914cc 100644 (file)
@@ -459,7 +459,7 @@ void PGRecovery::enqueue_drop(
   // allocate a pair if target is seen for the first time
   auto& req = backfill_drop_requests[target];
   if (!req) {
-    req = ceph::make_message<MOSDPGBackfillRemove>(
+    req = crimson::make_message<MOSDPGBackfillRemove>(
       spg_t(pg->get_pgid().pgid, target.shard), pg->get_osdmap_epoch());
   }
   req->ls.emplace_back(obj, v);
index 931b6d7a9c6d3b3503f0dfa0db666be4b2f21d45..7120342dd583700f100e83c4199b57ba0e701913 100644 (file)
@@ -89,7 +89,7 @@ private:
   // backfill begin
   std::unique_ptr<crimson::osd::BackfillState> backfill_state;
   std::map<pg_shard_t,
-           ceph::ref_t<MOSDPGBackfillRemove>> backfill_drop_requests;
+           MURef<MOSDPGBackfillRemove>> backfill_drop_requests;
 
   template <class EventT>
   void start_backfill_recovery(