]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
logclient: not a dispatcher
authorSage Weil <sage@inktank.com>
Tue, 5 Jun 2012 21:51:17 +0000 (14:51 -0700)
committerSage Weil <sage@inktank.com>
Tue, 5 Jun 2012 23:39:08 +0000 (16:39 -0700)
Let MonClient and Monitor handle delivery of messages.  This puts them in
control and lets them trigger sending of more messages when we have a
bunch queued.

Signed-off-by: Sage Weil <sage@inktank.com>
src/common/LogClient.cc
src/common/LogClient.h
src/mds/MDS.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.cc
src/osd/OSD.cc
src/perfglue/heap_profiler.cc

index 7a2513a47fd3c3649f5e11bede5e49365b003de9..7cd96952998072129cb5254e4e4434885a59de80 100644 (file)
@@ -22,7 +22,6 @@
 #include "messages/MLog.h"
 #include "messages/MLogAck.h"
 #include "mon/MonMap.h"
-#include "mon/MonClient.h"
 
 #include <iostream>
 #include <errno.h>
@@ -63,9 +62,8 @@ static inline int clog_type_to_syslog_prio(clog_type t)
 }
 
 LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
-                    MonClient *mc, enum logclient_flag_t flags) :
-    Dispatcher(cct),
-    messenger(m), monmap(mm), monc(mc), is_mon(flags & FLAG_MON),
+                    enum logclient_flag_t flags)
+  : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
     log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0)
 {
 }
@@ -199,14 +197,3 @@ void LogClient::handle_log_ack(MLogAck *m)
   m->put();
 }
 
-bool LogClient::ms_dispatch(Message *m)
-{
-  ldout(cct,20) << "dispatch " << m << dendl;
-
-  switch (m->get_type()) {
-  case MSG_LOGACK:
-    handle_log_ack((MLogAck*)m);
-    return true;
-  }
-  return false;
-}
index f839518362172cc34f5c90471bb0869eeee8f1c9..29e255dfc16a1cc23a07f21db8b58b33e96e5ab8 100644 (file)
@@ -17,7 +17,6 @@
 
 #include "common/LogEntry.h"
 #include "common/Mutex.h"
-#include "msg/Dispatcher.h"
 
 #include <iosfwd>
 #include <sstream>
@@ -26,8 +25,9 @@ class LogClient;
 class MLog;
 class MLogAck;
 class Messenger;
-class MonClient;
 class MonMap;
+class Message;
+class Connection;
 
 class LogClientTemp
 {
@@ -48,7 +48,7 @@ private:
   stringstream ss;
 };
 
-class LogClient : public Dispatcher
+class LogClient
 {
 public:
   enum logclient_flag_t {
@@ -57,7 +57,7 @@ public:
   };
 
   LogClient(CephContext *cct, Messenger *m, MonMap *mm,
-           MonClient *mc, enum logclient_flag_t flags);
+           enum logclient_flag_t flags);
 
   void handle_log_ack(MLogAck *m);
 
@@ -98,15 +98,14 @@ public:
 private:
   void do_log(clog_type type, std::stringstream& ss);
   void do_log(clog_type type, const std::string& s);
-  bool ms_dispatch(Message *m);
   Message *_get_mon_log_message();
   void ms_handle_connect(Connection *con) {}
   bool ms_handle_reset(Connection *con) { return false; }
   void ms_handle_remote_reset(Connection *con) {}
 
+  CephContext *cct;
   Messenger *messenger;
   MonMap *monmap;
-  MonClient *monc;
   bool is_mon;
   Mutex log_lock;
   version_t last_log_sent;
index 8213b09e8731682c60695d53bd82192d21894670..202030ca9841d23a3749715a2718b211ce92754f 100644 (file)
@@ -95,7 +95,7 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
   standby_replaying(false),
   messenger(m),
   monc(mc),
-  clog(m->cct, messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+  clog(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
   sessionmap(this) {
 
   orig_argc = 0;
@@ -439,7 +439,6 @@ int MDS::init(int wanted_state)
   dout(10) << sizeof(xlist<void*>::item) << "\t xlist<>::item   *2=" << 2*sizeof(xlist<void*>::item) << dendl;
 
   messenger->add_dispatcher_tail(this);
-  messenger->add_dispatcher_head(&clog);
 
   // get monmap
   monc->set_messenger(messenger);
index b002e780b0141a6a7e3d09f3054238f172faa5cf..4efaf6503a838bb562ed0fd4e4acc5571bf53c88 100644 (file)
@@ -169,6 +169,7 @@ bool MonClient::ms_dispatch(Message *m)
   case CEPH_MSG_AUTH_REPLY:
   case CEPH_MSG_MON_SUBSCRIBE_ACK:
   case CEPH_MSG_MON_GET_VERSION_REPLY:
+  case MSG_LOGACK:
     break;
   default:
     return false;
@@ -195,10 +196,25 @@ bool MonClient::ms_dispatch(Message *m)
     break;
   case CEPH_MSG_MON_GET_VERSION_REPLY:
     handle_get_version_reply((MMonGetVersionReply*)m);
+    break;
+  case MSG_LOGACK:
+    if (log_client) {
+      log_client->handle_log_ack((MLogAck*)m);
+    } else {
+      m->put();
+    }
+    break;
   }
   return true;
 }
 
+void MonClient::send_log()
+{
+  Message *lm = log_client->get_mon_log_message();
+  if (lm)
+    _send_mon_message(lm);
+}
+
 void MonClient::handle_monmap(MMonMap *m)
 {
   ldout(cct, 10) << "handle_monmap " << *m << dendl;
@@ -368,9 +384,7 @@ void MonClient::handle_auth(MAuthReply *m)
 
       if (log_client) {
        log_client->reset_session();
-       Message *lm = log_client->get_mon_log_message();
-       if (lm)
-         _send_mon_message(lm);
+       send_log();
       }
     }
   
@@ -510,9 +524,7 @@ void MonClient::tick()
    
     if (state == MC_STATE_HAVE_SESSION &&
        log_client) {
-      Message *m = log_client->get_mon_log_message();
-      if (m)
-       _send_mon_message(m);
+      send_log();
     }
   }
 
index 8763950f4fb67babd0070eca1368508ab0df044c..884f77016e799c43a83b9284a2f068325c553eea 100644 (file)
@@ -68,6 +68,7 @@ private:
   bool initialized;
 
   LogClient *log_client;
+  void send_log();
 
   AuthSupported *auth_supported;
 
index 46c1236681a4123828c86fa4f9ee1450c570a71a..3f2f287e08afdeba8720a8d92e76e02e5d764c04 100644 (file)
@@ -97,7 +97,7 @@ Monitor::Monitor(CephContext* cct_, string nm, MonitorStore *s, Messenger *m, Mo
   has_ever_joined(false),
   logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
   monmap(map),
-  clog(cct_, messenger, monmap, NULL, LogClient::FLAG_MON),
+  clog(cct_, messenger, monmap, LogClient::FLAG_MON),
   key_server(cct),
   auth_supported(cct),
   store(s),
@@ -356,7 +356,6 @@ int Monitor::init()
 
   // i'm ready!
   messenger->add_dispatcher_tail(this);
-  messenger->add_dispatcher_head(&clog);
   
   // start ticker
   timer.init();
@@ -1703,6 +1702,10 @@ bool Monitor::_ms_dispatch(Message *m)
       paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
       break;
 
+    case MSG_LOGACK:
+      clog.handle_log_ack((MLogAck*)m);
+      break;
+
       // monmap
     case MSG_MON_JOIN:
       paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
index 9feec158c6b05feac11fc8936bcc5ebba38ab0e3..a026dcf3004169bf9258d93f4d0ea2fc61e7b1ee 100644 (file)
@@ -595,7 +595,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
   logger(NULL),
   store(NULL),
   map_in_progress(false),
-  clog(external_messenger->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+  clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
   whoami(id),
   dev_path(dev), journal_path(jdev),
   dispatch_running(false),
@@ -760,7 +760,6 @@ int OSD::init()
     
   // i'm ready!
   client_messenger->add_dispatcher_head(this);
-  client_messenger->add_dispatcher_head(&clog);
   cluster_messenger->add_dispatcher_head(this);
 
   hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
index 31c0064dade495d20089a0e4050ac6d77311c2b0..200b503aa087dbc4712d0974e9bd0bb1ab36fe7e 100644 (file)
@@ -17,6 +17,8 @@
 #include "heap_profiler.h"
 #include "common/environment.h"
 #include "common/LogClient.h"
+#include "global/global_context.h"
+#include "common/debug.h"
 
 bool ceph_using_tcmalloc()
 {