From a144cacdd88bf8e9afed97e2d0e5d4f9f9557fe9 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 18 Sep 2019 12:30:36 +0800 Subject: [PATCH] mon/MonClient: add send_mon_message(MessageRef) Signed-off-by: Kefu Chai --- src/common/LogClient.cc | 18 +++++++-------- src/common/LogClient.h | 7 +++--- src/mon/MonClient.cc | 51 ++++++++++++++++++++--------------------- src/mon/MonClient.h | 8 +++---- 4 files changed, 41 insertions(+), 43 deletions(-) diff --git a/src/common/LogClient.cc b/src/common/LogClient.cc index 24aa31841d6..b941bd47e39 100644 --- a/src/common/LogClient.cc +++ b/src/common/LogClient.cc @@ -238,7 +238,7 @@ void LogChannel::do_log(clog_type prio, const std::string& s) } } -Message *LogClient::get_mon_log_message(bool flush) +ceph::ref_t LogClient::get_mon_log_message(bool flush) { std::lock_guard l(log_lock); if (flush) { @@ -256,18 +256,18 @@ bool LogClient::are_pending() return last_log > last_log_sent; } -Message *LogClient::_get_mon_log_message() +ceph::ref_t LogClient::_get_mon_log_message() { ceph_assert(ceph_mutex_is_locked(log_lock)); if (log_queue.empty()) - return NULL; + return {}; // only send entries that haven't been sent yet during this mon // session! monclient needs to call reset_session() on mon session // reset for this to work right. if (last_log_sent == last_log) - return NULL; + return {}; // limit entries per message unsigned num_unsent = last_log - last_log_sent; @@ -296,10 +296,8 @@ Message *LogClient::_get_mon_log_message() ++p; } - MLog *log = new MLog(monmap->get_fsid()); - log->entries.swap(o); - - return log; + return ceph::make_message(monmap->get_fsid(), + std::move(o)); } void LogClient::_send_to_mon() @@ -308,8 +306,8 @@ void LogClient::_send_to_mon() ceph_assert(is_mon); ceph_assert(messenger->get_myname().is_mon()); ldout(cct,10) << __func__ << " log to self" << dendl; - Message *log = _get_mon_log_message(); - messenger->get_loopback_connection()->send_message(log); + auto log = _get_mon_log_message(); + messenger->get_loopback_connection()->send_message2(std::move(log)); } version_t LogClient::queue(LogEntry &entry) diff --git a/src/common/LogClient.h b/src/common/LogClient.h index 84b41823661..179fc2eec3f 100644 --- a/src/common/LogClient.h +++ b/src/common/LogClient.h @@ -17,8 +17,9 @@ #include #include "common/LogEntry.h" -#include "common/ostream_temp.h" #include "common/ceph_mutex.h" +#include "common/ostream_temp.h" +#include "common/ref.h" #include "include/health.h" class LogClient; @@ -201,7 +202,7 @@ public: } bool handle_log_ack(MLogAck *m); - Message *get_mon_log_message(bool flush); + ceph::ref_t get_mon_log_message(bool flush); bool are_pending(); LogChannelRef create_channel() { @@ -235,7 +236,7 @@ public: version_t queue(LogEntry &entry); private: - Message *_get_mon_log_message(); + ceph::ref_t _get_mon_log_message(); void _send_to_mon(); CephContext *cct; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index ce1860a9d88..efad81026aa 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -347,9 +347,9 @@ bool MonClient::ms_dispatch(Message *m) void MonClient::send_log(bool flush) { if (log_client) { - Message *lm = log_client->get_mon_log_message(flush); + auto lm = log_client->get_mon_log_message(flush); if (lm) - _send_mon_message(lm); + _send_mon_message(std::move(lm)); more_log_pending = log_client->are_pending(); } } @@ -483,11 +483,9 @@ void MonClient::shutdown() auto tid = mon_commands.begin()->first; _cancel_mon_command(tid); } - while (!waiting_for_session.empty()) { - ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl; - waiting_for_session.front()->put(); - waiting_for_session.pop_front(); - } + ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size() + << " pending message(s)" << dendl; + waiting_for_session.clear(); active_con.reset(); pending_cons.clear(); @@ -623,7 +621,13 @@ void MonClient::_finish_auth(int auth_err) // --------- -void MonClient::_send_mon_message(Message *m) +void MonClient::send_mon_message(MessageRef m) +{ + std::lock_guard l{monc_lock}; + _send_mon_message(std::move(m)); +} + +void MonClient::_send_mon_message(MessageRef m) { ceph_assert(ceph_mutex_is_locked(monc_lock)); if (active_con) { @@ -631,9 +635,9 @@ void MonClient::_send_mon_message(Message *m) ldout(cct, 10) << "_send_mon_message to mon." << monmap.get_name(cur_con->get_peer_addr()) << " at " << cur_con->get_peer_addr() << dendl; - cur_con->send_message(m); + cur_con->send_message2(std::move(m)); } else { - waiting_for_session.push_back(m); + waiting_for_session.push_back(std::move(m)); } } @@ -654,10 +658,7 @@ void MonClient::_reopen_session(int rank) } // throw out old queued messages - while (!waiting_for_session.empty()) { - waiting_for_session.front()->put(); - waiting_for_session.pop_front(); - } + waiting_for_session.clear(); // throw out version check requests while (!version_requests.empty()) { @@ -822,7 +823,7 @@ void MonClient::_finish_hunting(int auth_err) if (!auth_err) { last_rotating_renew_sent = utime_t(); while (!waiting_for_session.empty()) { - _send_mon_message(waiting_for_session.front()); + _send_mon_message(std::move(waiting_for_session.front())); waiting_for_session.pop_front(); } _resend_mon_commands(); @@ -919,10 +920,10 @@ void MonClient::_renew_subs() if (!_opened()) _reopen_session(); else { - MMonSubscribe *m = new MMonSubscribe; + auto m = ceph::make_message(); m->what = sub.get_subs(); m->hostname = ceph_get_short_hostname(); - _send_mon_message(m); + _send_mon_message(std::move(m)); sub.renewed(); } } @@ -939,7 +940,7 @@ int MonClient::_check_auth_tickets() if (active_con && auth) { if (auth->need_tickets()) { ldout(cct, 10) << __func__ << " getting new tickets!" << dendl; - MAuth *m = new MAuth; + auto m = ceph::make_message(); m->protocol = auth->get_protocol(); auth->prepare_build_request(); auth->build_request(m->auth_payload); @@ -989,13 +990,11 @@ int MonClient::_check_auth_rotating() << last_rotating_renew_sent << "), skipping refresh" << dendl; return 0; } - MAuth *m = new MAuth; + auto m = ceph::make_message(); m->protocol = auth->get_protocol(); if (auth->build_rotating_request(m->auth_payload)) { last_rotating_renew_sent = now; - _send_mon_message(m); - } else { - m->put(); + _send_mon_message(std::move(m)); } return 0; } @@ -1077,11 +1076,11 @@ void MonClient::_send_command(MonCommand *r) } ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl; - MMonCommand *m = new MMonCommand(monmap.fsid); + auto m = ceph::make_message(monmap.fsid); m->set_tid(r->tid); m->cmd = r->cmd; m->set_data(r->inbl); - _send_mon_message(m); + _send_mon_message(std::move(m)); return; } @@ -1245,11 +1244,11 @@ void MonClient::get_version(string map, version_t *newest, version_t *oldest, Co version_req_d *req = new version_req_d(onfinish, newest, oldest); ldout(cct, 10) << "get_version " << map << " req " << req << dendl; std::lock_guard l(monc_lock); - MMonGetVersion *m = new MMonGetVersion(); + auto m = ceph::make_message(); m->what = map; m->handle = ++version_req_id; version_requests[m->handle] = req; - _send_mon_message(m); + _send_mon_message(std::move(m)); } void MonClient::handle_get_version_reply(MMonGetVersionReply* m) diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index bee891ce933..6c16d4d08d1 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -287,7 +287,7 @@ private: int authenticate_err = 0; bool authenticated = false; - std::list waiting_for_session; + std::list waiting_for_session; utime_t last_rotating_renew_sent; std::unique_ptr session_established_context; bool had_a_connection; @@ -304,7 +304,7 @@ private: MonConnection& _add_conn(unsigned rank, uint64_t global_id); void _un_backoff(); void _add_conns(uint64_t global_id); - void _send_mon_message(Message *m); + void _send_mon_message(MessageRef m); std::map::iterator _find_pending_con( const ConnectionRef& con) { @@ -448,9 +448,9 @@ public: int ping_monitor(const std::string &mon_id, std::string *result_reply); void send_mon_message(Message *m) { - std::lock_guard l(monc_lock); - _send_mon_message(m); + send_mon_message(MessageRef{m, false}); } + void send_mon_message(MessageRef m); /** * If you specify a callback, you should not call * reopen_session() again until it has been triggered. The MonClient -- 2.39.5