]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
mon/MonClient: add send_mon_message(MessageRef)
authorKefu Chai <kchai@redhat.com>
Wed, 18 Sep 2019 04:30:36 +0000 (12:30 +0800)
committerKefu Chai <kchai@redhat.com>
Fri, 20 Sep 2019 10:21:56 +0000 (18:21 +0800)
Signed-off-by: Kefu Chai <kchai@redhat.com>
src/common/LogClient.cc
src/common/LogClient.h
src/mon/MonClient.cc
src/mon/MonClient.h

index 24aa31841d6e3140dfb00e5b81962bad61347574..b941bd47e3974c3f535b6e1915181b0630032f85 100644 (file)
@@ -238,7 +238,7 @@ void LogChannel::do_log(clog_type prio, const std::string& s)
   }
 }
 
-Message *LogClient::get_mon_log_message(bool flush)
+ceph::ref_t<Message> LogClient::get_mon_log_message(bool flush)
 {
   std::lock_guard l(log_lock);
   if (flush) {
@@ -256,18 +256,18 @@ bool LogClient::are_pending()
   return last_log > last_log_sent;
 }
 
-Message *LogClient::_get_mon_log_message()
+ceph::ref_t<Message> LogClient::_get_mon_log_message()
 {
   ceph_assert(ceph_mutex_is_locked(log_lock));
   if (log_queue.empty())
-    return NULL;
+    return {};
 
   // only send entries that haven't been sent yet during this mon
   // session!  monclient needs to call reset_session() on mon session
   // reset for this to work right.
 
   if (last_log_sent == last_log)
-    return NULL;
+    return {};
 
   // limit entries per message
   unsigned num_unsent = last_log - last_log_sent;
@@ -296,10 +296,8 @@ Message *LogClient::_get_mon_log_message()
     ++p;
   }
   
-  MLog *log = new MLog(monmap->get_fsid());
-  log->entries.swap(o);
-
-  return log;
+  return ceph::make_message<MLog>(monmap->get_fsid(),
+                                 std::move(o));
 }
 
 void LogClient::_send_to_mon()
@@ -308,8 +306,8 @@ void LogClient::_send_to_mon()
   ceph_assert(is_mon);
   ceph_assert(messenger->get_myname().is_mon());
   ldout(cct,10) << __func__ << " log to self" << dendl;
-  Message *log = _get_mon_log_message();
-  messenger->get_loopback_connection()->send_message(log);
+  auto log = _get_mon_log_message();
+  messenger->get_loopback_connection()->send_message2(std::move(log));
 }
 
 version_t LogClient::queue(LogEntry &entry)
index 84b41823661adc32c8f359b22feff2a6a7805728..179fc2eec3fcfe8ba3d2e0a82ef84aed6b973d2c 100644 (file)
@@ -17,8 +17,9 @@
 
 #include <atomic>
 #include "common/LogEntry.h"
-#include "common/ostream_temp.h"
 #include "common/ceph_mutex.h"
+#include "common/ostream_temp.h"
+#include "common/ref.h"
 #include "include/health.h"
 
 class LogClient;
@@ -201,7 +202,7 @@ public:
   }
 
   bool handle_log_ack(MLogAck *m);
-  Message *get_mon_log_message(bool flush);
+  ceph::ref_t<Message> get_mon_log_message(bool flush);
   bool are_pending();
 
   LogChannelRef create_channel() {
@@ -235,7 +236,7 @@ public:
   version_t queue(LogEntry &entry);
 
 private:
-  Message *_get_mon_log_message();
+  ceph::ref_t<Message> _get_mon_log_message();
   void _send_to_mon();
 
   CephContext *cct;
index ce1860a9d887b658fb5f8a922e6c71fac98aa68c..efad81026aae60bc353a442e4fa57fbde5ee8e43 100644 (file)
@@ -347,9 +347,9 @@ bool MonClient::ms_dispatch(Message *m)
 void MonClient::send_log(bool flush)
 {
   if (log_client) {
-    Message *lm = log_client->get_mon_log_message(flush);
+    auto lm = log_client->get_mon_log_message(flush);
     if (lm)
-      _send_mon_message(lm);
+      _send_mon_message(std::move(lm));
     more_log_pending = log_client->are_pending();
   }
 }
@@ -483,11 +483,9 @@ void MonClient::shutdown()
     auto tid = mon_commands.begin()->first;
     _cancel_mon_command(tid);
   }
-  while (!waiting_for_session.empty()) {
-    ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
-    waiting_for_session.front()->put();
-    waiting_for_session.pop_front();
-  }
+  ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
+                << " pending message(s)" << dendl;
+  waiting_for_session.clear();
 
   active_con.reset();
   pending_cons.clear();
@@ -623,7 +621,13 @@ void MonClient::_finish_auth(int auth_err)
 
 // ---------
 
-void MonClient::_send_mon_message(Message *m)
+void MonClient::send_mon_message(MessageRef m)
+{
+  std::lock_guard l{monc_lock};
+  _send_mon_message(std::move(m));
+}
+
+void MonClient::_send_mon_message(MessageRef m)
 {
   ceph_assert(ceph_mutex_is_locked(monc_lock));
   if (active_con) {
@@ -631,9 +635,9 @@ void MonClient::_send_mon_message(Message *m)
     ldout(cct, 10) << "_send_mon_message to mon."
                   << monmap.get_name(cur_con->get_peer_addr())
                   << " at " << cur_con->get_peer_addr() << dendl;
-    cur_con->send_message(m);
+    cur_con->send_message2(std::move(m));
   } else {
-    waiting_for_session.push_back(m);
+    waiting_for_session.push_back(std::move(m));
   }
 }
 
@@ -654,10 +658,7 @@ void MonClient::_reopen_session(int rank)
   }
 
   // throw out old queued messages
-  while (!waiting_for_session.empty()) {
-    waiting_for_session.front()->put();
-    waiting_for_session.pop_front();
-  }
+  waiting_for_session.clear();
 
   // throw out version check requests
   while (!version_requests.empty()) {
@@ -822,7 +823,7 @@ void MonClient::_finish_hunting(int auth_err)
   if (!auth_err) {
     last_rotating_renew_sent = utime_t();
     while (!waiting_for_session.empty()) {
-      _send_mon_message(waiting_for_session.front());
+      _send_mon_message(std::move(waiting_for_session.front()));
       waiting_for_session.pop_front();
     }
     _resend_mon_commands();
@@ -919,10 +920,10 @@ void MonClient::_renew_subs()
   if (!_opened())
     _reopen_session();
   else {
-    MMonSubscribe *m = new MMonSubscribe;
+    auto m = ceph::make_message<MMonSubscribe>();
     m->what = sub.get_subs();
     m->hostname = ceph_get_short_hostname();
-    _send_mon_message(m);
+    _send_mon_message(std::move(m));
     sub.renewed();
   }
 }
@@ -939,7 +940,7 @@ int MonClient::_check_auth_tickets()
   if (active_con && auth) {
     if (auth->need_tickets()) {
       ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
-      MAuth *m = new MAuth;
+      auto m = ceph::make_message<MAuth>();
       m->protocol = auth->get_protocol();
       auth->prepare_build_request();
       auth->build_request(m->auth_payload);
@@ -989,13 +990,11 @@ int MonClient::_check_auth_rotating()
                    << last_rotating_renew_sent << "), skipping refresh" << dendl;
     return 0;
   }
-  MAuth *m = new MAuth;
+  auto m = ceph::make_message<MAuth>();
   m->protocol = auth->get_protocol();
   if (auth->build_rotating_request(m->auth_payload)) {
     last_rotating_renew_sent = now;
-    _send_mon_message(m);
-  } else {
-    m->put();
+    _send_mon_message(std::move(m));
   }
   return 0;
 }
@@ -1077,11 +1076,11 @@ void MonClient::_send_command(MonCommand *r)
   }
 
   ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
-  MMonCommand *m = new MMonCommand(monmap.fsid);
+  auto m = ceph::make_message<MMonCommand>(monmap.fsid);
   m->set_tid(r->tid);
   m->cmd = r->cmd;
   m->set_data(r->inbl);
-  _send_mon_message(m);
+  _send_mon_message(std::move(m));
   return;
 }
 
@@ -1245,11 +1244,11 @@ void MonClient::get_version(string map, version_t *newest, version_t *oldest, Co
   version_req_d *req = new version_req_d(onfinish, newest, oldest);
   ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
   std::lock_guard l(monc_lock);
-  MMonGetVersion *m = new MMonGetVersion();
+  auto m = ceph::make_message<MMonGetVersion>();
   m->what = map;
   m->handle = ++version_req_id;
   version_requests[m->handle] = req;
-  _send_mon_message(m);
+  _send_mon_message(std::move(m));
 }
 
 void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
index bee891ce9334449e2d5ae6e45befcd2b1582defb..6c16d4d08d19d1fe619e123d88e705300dfba7c4 100644 (file)
@@ -287,7 +287,7 @@ private:
   int authenticate_err = 0;
   bool authenticated = false;
 
-  std::list<Message*> waiting_for_session;
+  std::list<MessageRef> waiting_for_session;
   utime_t last_rotating_renew_sent;
   std::unique_ptr<Context> session_established_context;
   bool had_a_connection;
@@ -304,7 +304,7 @@ private:
   MonConnection& _add_conn(unsigned rank, uint64_t global_id);
   void _un_backoff();
   void _add_conns(uint64_t global_id);
-  void _send_mon_message(Message *m);
+  void _send_mon_message(MessageRef m);
 
   std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
     const ConnectionRef& con) {
@@ -448,9 +448,9 @@ public:
   int ping_monitor(const std::string &mon_id, std::string *result_reply);
 
   void send_mon_message(Message *m) {
-    std::lock_guard l(monc_lock);
-    _send_mon_message(m);
+    send_mon_message(MessageRef{m, false});
   }
+  void send_mon_message(MessageRef m);
   /**
    * If you specify a callback, you should not call
    * reopen_session() again until it has been triggered. The MonClient