From 235f92744563758c6b10f1ae79f5402b35282b2c Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 5 Jun 2012 14:51:17 -0700 Subject: [PATCH] logclient: not a dispatcher Let MonClient and Monitor handle delivery of messages. This puts them in control and lets them trigger sending of more messages when we have a bunch queued. Signed-off-by: Sage Weil --- src/common/LogClient.cc | 17 ++--------------- src/common/LogClient.h | 11 +++++------ src/mds/MDS.cc | 3 +-- src/mon/MonClient.cc | 24 ++++++++++++++++++------ src/mon/MonClient.h | 1 + src/mon/Monitor.cc | 7 +++++-- src/osd/OSD.cc | 3 +-- src/perfglue/heap_profiler.cc | 2 ++ 8 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/common/LogClient.cc b/src/common/LogClient.cc index 7a2513a47fd3c..7cd9695299807 100644 --- a/src/common/LogClient.cc +++ b/src/common/LogClient.cc @@ -22,7 +22,6 @@ #include "messages/MLog.h" #include "messages/MLogAck.h" #include "mon/MonMap.h" -#include "mon/MonClient.h" #include #include @@ -63,9 +62,8 @@ static inline int clog_type_to_syslog_prio(clog_type t) } LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm, - MonClient *mc, enum logclient_flag_t flags) : - Dispatcher(cct), - messenger(m), monmap(mm), monc(mc), is_mon(flags & FLAG_MON), + enum logclient_flag_t flags) + : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON), log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0) { } @@ -199,14 +197,3 @@ void LogClient::handle_log_ack(MLogAck *m) m->put(); } -bool LogClient::ms_dispatch(Message *m) -{ - ldout(cct,20) << "dispatch " << m << dendl; - - switch (m->get_type()) { - case MSG_LOGACK: - handle_log_ack((MLogAck*)m); - return true; - } - return false; -} diff --git a/src/common/LogClient.h b/src/common/LogClient.h index f839518362172..29e255dfc16a1 100644 --- a/src/common/LogClient.h +++ b/src/common/LogClient.h @@ -17,7 +17,6 @@ #include "common/LogEntry.h" #include "common/Mutex.h" -#include "msg/Dispatcher.h" #include #include @@ -26,8 +25,9 @@ class LogClient; class MLog; class MLogAck; class Messenger; -class MonClient; class MonMap; +class Message; +class Connection; class LogClientTemp { @@ -48,7 +48,7 @@ private: stringstream ss; }; -class LogClient : public Dispatcher +class LogClient { public: enum logclient_flag_t { @@ -57,7 +57,7 @@ public: }; LogClient(CephContext *cct, Messenger *m, MonMap *mm, - MonClient *mc, enum logclient_flag_t flags); + enum logclient_flag_t flags); void handle_log_ack(MLogAck *m); @@ -98,15 +98,14 @@ public: private: void do_log(clog_type type, std::stringstream& ss); void do_log(clog_type type, const std::string& s); - bool ms_dispatch(Message *m); Message *_get_mon_log_message(); void ms_handle_connect(Connection *con) {} bool ms_handle_reset(Connection *con) { return false; } void ms_handle_remote_reset(Connection *con) {} + CephContext *cct; Messenger *messenger; MonMap *monmap; - MonClient *monc; bool is_mon; Mutex log_lock; version_t last_log_sent; diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index 8213b09e87316..202030ca9841d 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -95,7 +95,7 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) : standby_replaying(false), messenger(m), monc(mc), - clog(m->cct, messenger, &mc->monmap, mc, LogClient::NO_FLAGS), + clog(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS), sessionmap(this) { orig_argc = 0; @@ -439,7 +439,6 @@ int MDS::init(int wanted_state) dout(10) << sizeof(xlist::item) << "\t xlist<>::item *2=" << 2*sizeof(xlist::item) << dendl; messenger->add_dispatcher_tail(this); - messenger->add_dispatcher_head(&clog); // get monmap monc->set_messenger(messenger); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index b002e780b0141..4efaf6503a838 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -169,6 +169,7 @@ bool MonClient::ms_dispatch(Message *m) case CEPH_MSG_AUTH_REPLY: case CEPH_MSG_MON_SUBSCRIBE_ACK: case CEPH_MSG_MON_GET_VERSION_REPLY: + case MSG_LOGACK: break; default: return false; @@ -195,10 +196,25 @@ bool MonClient::ms_dispatch(Message *m) break; case CEPH_MSG_MON_GET_VERSION_REPLY: handle_get_version_reply((MMonGetVersionReply*)m); + break; + case MSG_LOGACK: + if (log_client) { + log_client->handle_log_ack((MLogAck*)m); + } else { + m->put(); + } + break; } return true; } +void MonClient::send_log() +{ + Message *lm = log_client->get_mon_log_message(); + if (lm) + _send_mon_message(lm); +} + void MonClient::handle_monmap(MMonMap *m) { ldout(cct, 10) << "handle_monmap " << *m << dendl; @@ -368,9 +384,7 @@ void MonClient::handle_auth(MAuthReply *m) if (log_client) { log_client->reset_session(); - Message *lm = log_client->get_mon_log_message(); - if (lm) - _send_mon_message(lm); + send_log(); } } @@ -510,9 +524,7 @@ void MonClient::tick() if (state == MC_STATE_HAVE_SESSION && log_client) { - Message *m = log_client->get_mon_log_message(); - if (m) - _send_mon_message(m); + send_log(); } } diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 8763950f4fb67..884f77016e799 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -68,6 +68,7 @@ private: bool initialized; LogClient *log_client; + void send_log(); AuthSupported *auth_supported; diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 46c1236681a41..3f2f287e08afd 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -97,7 +97,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorStore *s, Messenger *m, Mo has_ever_joined(false), logger(NULL), cluster_logger(NULL), cluster_logger_registered(false), monmap(map), - clog(cct_, messenger, monmap, NULL, LogClient::FLAG_MON), + clog(cct_, messenger, monmap, LogClient::FLAG_MON), key_server(cct), auth_supported(cct), store(s), @@ -356,7 +356,6 @@ int Monitor::init() // i'm ready! messenger->add_dispatcher_tail(this); - messenger->add_dispatcher_head(&clog); // start ticker timer.init(); @@ -1703,6 +1702,10 @@ bool Monitor::_ms_dispatch(Message *m) paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m); break; + case MSG_LOGACK: + clog.handle_log_ack((MLogAck*)m); + break; + // monmap case MSG_MON_JOIN: paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 9feec158c6b05..a026dcf300416 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -595,7 +595,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, logger(NULL), store(NULL), map_in_progress(false), - clog(external_messenger->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS), + clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS), whoami(id), dev_path(dev), journal_path(jdev), dispatch_running(false), @@ -760,7 +760,6 @@ int OSD::init() // i'm ready! client_messenger->add_dispatcher_head(this); - client_messenger->add_dispatcher_head(&clog); cluster_messenger->add_dispatcher_head(this); hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher); diff --git a/src/perfglue/heap_profiler.cc b/src/perfglue/heap_profiler.cc index 31c0064dade49..200b503aa087d 100644 --- a/src/perfglue/heap_profiler.cc +++ b/src/perfglue/heap_profiler.cc @@ -17,6 +17,8 @@ #include "heap_profiler.h" #include "common/environment.h" #include "common/LogClient.h" +#include "global/global_context.h" +#include "common/debug.h" bool ceph_using_tcmalloc() { -- 2.39.5