}
}
-Message *LogClient::get_mon_log_message(bool flush)
+ceph::ref_t<Message> LogClient::get_mon_log_message(bool flush)
{
std::lock_guard l(log_lock);
if (flush) {
return last_log > last_log_sent;
}
-Message *LogClient::_get_mon_log_message()
+ceph::ref_t<Message> LogClient::_get_mon_log_message()
{
ceph_assert(ceph_mutex_is_locked(log_lock));
if (log_queue.empty())
- return NULL;
+ return {};
// only send entries that haven't been sent yet during this mon
// session! monclient needs to call reset_session() on mon session
// reset for this to work right.
if (last_log_sent == last_log)
- return NULL;
+ return {};
// limit entries per message
unsigned num_unsent = last_log - last_log_sent;
++p;
}
- MLog *log = new MLog(monmap->get_fsid());
- log->entries.swap(o);
-
- return log;
+ return ceph::make_message<MLog>(monmap->get_fsid(),
+ std::move(o));
}
void LogClient::_send_to_mon()
ceph_assert(is_mon);
ceph_assert(messenger->get_myname().is_mon());
ldout(cct,10) << __func__ << " log to self" << dendl;
- Message *log = _get_mon_log_message();
- messenger->get_loopback_connection()->send_message(log);
+ auto log = _get_mon_log_message();
+ messenger->get_loopback_connection()->send_message2(std::move(log));
}
version_t LogClient::queue(LogEntry &entry)
void MonClient::send_log(bool flush)
{
if (log_client) {
- Message *lm = log_client->get_mon_log_message(flush);
+ auto lm = log_client->get_mon_log_message(flush);
if (lm)
- _send_mon_message(lm);
+ _send_mon_message(std::move(lm));
more_log_pending = log_client->are_pending();
}
}
auto tid = mon_commands.begin()->first;
_cancel_mon_command(tid);
}
- while (!waiting_for_session.empty()) {
- ldout(cct, 20) << __func__ << " discarding pending message " << *waiting_for_session.front() << dendl;
- waiting_for_session.front()->put();
- waiting_for_session.pop_front();
- }
+ ldout(cct, 20) << __func__ << " discarding " << waiting_for_session.size()
+ << " pending message(s)" << dendl;
+ waiting_for_session.clear();
active_con.reset();
pending_cons.clear();
// ---------
-void MonClient::_send_mon_message(Message *m)
+void MonClient::send_mon_message(MessageRef m)
+{
+ std::lock_guard l{monc_lock};
+ _send_mon_message(std::move(m));
+}
+
+void MonClient::_send_mon_message(MessageRef m)
{
ceph_assert(ceph_mutex_is_locked(monc_lock));
if (active_con) {
ldout(cct, 10) << "_send_mon_message to mon."
<< monmap.get_name(cur_con->get_peer_addr())
<< " at " << cur_con->get_peer_addr() << dendl;
- cur_con->send_message(m);
+ cur_con->send_message2(std::move(m));
} else {
- waiting_for_session.push_back(m);
+ waiting_for_session.push_back(std::move(m));
}
}
}
// throw out old queued messages
- while (!waiting_for_session.empty()) {
- waiting_for_session.front()->put();
- waiting_for_session.pop_front();
- }
+ waiting_for_session.clear();
// throw out version check requests
while (!version_requests.empty()) {
if (!auth_err) {
last_rotating_renew_sent = utime_t();
while (!waiting_for_session.empty()) {
- _send_mon_message(waiting_for_session.front());
+ _send_mon_message(std::move(waiting_for_session.front()));
waiting_for_session.pop_front();
}
_resend_mon_commands();
if (!_opened())
_reopen_session();
else {
- MMonSubscribe *m = new MMonSubscribe;
+ auto m = ceph::make_message<MMonSubscribe>();
m->what = sub.get_subs();
m->hostname = ceph_get_short_hostname();
- _send_mon_message(m);
+ _send_mon_message(std::move(m));
sub.renewed();
}
}
if (active_con && auth) {
if (auth->need_tickets()) {
ldout(cct, 10) << __func__ << " getting new tickets!" << dendl;
- MAuth *m = new MAuth;
+ auto m = ceph::make_message<MAuth>();
m->protocol = auth->get_protocol();
auth->prepare_build_request();
auth->build_request(m->auth_payload);
<< last_rotating_renew_sent << "), skipping refresh" << dendl;
return 0;
}
- MAuth *m = new MAuth;
+ auto m = ceph::make_message<MAuth>();
m->protocol = auth->get_protocol();
if (auth->build_rotating_request(m->auth_payload)) {
last_rotating_renew_sent = now;
- _send_mon_message(m);
- } else {
- m->put();
+ _send_mon_message(std::move(m));
}
return 0;
}
}
ldout(cct, 10) << __func__ << " " << r->tid << " " << r->cmd << dendl;
- MMonCommand *m = new MMonCommand(monmap.fsid);
+ auto m = ceph::make_message<MMonCommand>(monmap.fsid);
m->set_tid(r->tid);
m->cmd = r->cmd;
m->set_data(r->inbl);
- _send_mon_message(m);
+ _send_mon_message(std::move(m));
return;
}
version_req_d *req = new version_req_d(onfinish, newest, oldest);
ldout(cct, 10) << "get_version " << map << " req " << req << dendl;
std::lock_guard l(monc_lock);
- MMonGetVersion *m = new MMonGetVersion();
+ auto m = ceph::make_message<MMonGetVersion>();
m->what = map;
m->handle = ++version_req_id;
version_requests[m->handle] = req;
- _send_mon_message(m);
+ _send_mon_message(std::move(m));
}
void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
int authenticate_err = 0;
bool authenticated = false;
- std::list<Message*> waiting_for_session;
+ std::list<MessageRef> waiting_for_session;
utime_t last_rotating_renew_sent;
std::unique_ptr<Context> session_established_context;
bool had_a_connection;
MonConnection& _add_conn(unsigned rank, uint64_t global_id);
void _un_backoff();
void _add_conns(uint64_t global_id);
- void _send_mon_message(Message *m);
+ void _send_mon_message(MessageRef m);
std::map<entity_addrvec_t, MonConnection>::iterator _find_pending_con(
const ConnectionRef& con) {
int ping_monitor(const std::string &mon_id, std::string *result_reply);
void send_mon_message(Message *m) {
- std::lock_guard l(monc_lock);
- _send_mon_message(m);
+ send_mon_message(MessageRef{m, false});
}
+ void send_mon_message(MessageRef m);
/**
* If you specify a callback, you should not call
* reopen_session() again until it has been triggered. The MonClient