#include "messages/MLog.h"
#include "messages/MLogAck.h"
#include "mon/MonMap.h"
-#include "mon/MonClient.h"
#include <iostream>
#include <errno.h>
}
LogClient::LogClient(CephContext *cct, Messenger *m, MonMap *mm,
- MonClient *mc, enum logclient_flag_t flags) :
- Dispatcher(cct),
- messenger(m), monmap(mm), monc(mc), is_mon(flags & FLAG_MON),
+ enum logclient_flag_t flags)
+ : cct(cct), messenger(m), monmap(mm), is_mon(flags & FLAG_MON),
log_lock("LogClient::log_lock"), last_log_sent(0), last_log(0)
{
}
m->put();
}
-bool LogClient::ms_dispatch(Message *m)
-{
- ldout(cct,20) << "dispatch " << m << dendl;
-
- switch (m->get_type()) {
- case MSG_LOGACK:
- handle_log_ack((MLogAck*)m);
- return true;
- }
- return false;
-}
#include "common/LogEntry.h"
#include "common/Mutex.h"
-#include "msg/Dispatcher.h"
#include <iosfwd>
#include <sstream>
class MLog;
class MLogAck;
class Messenger;
-class MonClient;
class MonMap;
+class Message;
+class Connection;
class LogClientTemp
{
stringstream ss;
};
-class LogClient : public Dispatcher
+class LogClient
{
public:
enum logclient_flag_t {
};
LogClient(CephContext *cct, Messenger *m, MonMap *mm,
- MonClient *mc, enum logclient_flag_t flags);
+ enum logclient_flag_t flags);
void handle_log_ack(MLogAck *m);
private:
void do_log(clog_type type, std::stringstream& ss);
void do_log(clog_type type, const std::string& s);
- bool ms_dispatch(Message *m);
Message *_get_mon_log_message();
void ms_handle_connect(Connection *con) {}
bool ms_handle_reset(Connection *con) { return false; }
void ms_handle_remote_reset(Connection *con) {}
+ CephContext *cct;
Messenger *messenger;
MonMap *monmap;
- MonClient *monc;
bool is_mon;
Mutex log_lock;
version_t last_log_sent;
standby_replaying(false),
messenger(m),
monc(mc),
- clog(m->cct, messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+ clog(m->cct, messenger, &mc->monmap, LogClient::NO_FLAGS),
sessionmap(this) {
orig_argc = 0;
dout(10) << sizeof(xlist<void*>::item) << "\t xlist<>::item *2=" << 2*sizeof(xlist<void*>::item) << dendl;
messenger->add_dispatcher_tail(this);
- messenger->add_dispatcher_head(&clog);
// get monmap
monc->set_messenger(messenger);
case CEPH_MSG_AUTH_REPLY:
case CEPH_MSG_MON_SUBSCRIBE_ACK:
case CEPH_MSG_MON_GET_VERSION_REPLY:
+ case MSG_LOGACK:
break;
default:
return false;
break;
case CEPH_MSG_MON_GET_VERSION_REPLY:
handle_get_version_reply((MMonGetVersionReply*)m);
+ break;
+ case MSG_LOGACK:
+ if (log_client) {
+ log_client->handle_log_ack((MLogAck*)m);
+ } else {
+ m->put();
+ }
+ break;
}
return true;
}
+void MonClient::send_log()
+{
+ Message *lm = log_client->get_mon_log_message();
+ if (lm)
+ _send_mon_message(lm);
+}
+
void MonClient::handle_monmap(MMonMap *m)
{
ldout(cct, 10) << "handle_monmap " << *m << dendl;
if (log_client) {
log_client->reset_session();
- Message *lm = log_client->get_mon_log_message();
- if (lm)
- _send_mon_message(lm);
+ send_log();
}
}
if (state == MC_STATE_HAVE_SESSION &&
log_client) {
- Message *m = log_client->get_mon_log_message();
- if (m)
- _send_mon_message(m);
+ send_log();
}
}
bool initialized;
LogClient *log_client;
+ void send_log();
AuthSupported *auth_supported;
has_ever_joined(false),
logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),
monmap(map),
- clog(cct_, messenger, monmap, NULL, LogClient::FLAG_MON),
+ clog(cct_, messenger, monmap, LogClient::FLAG_MON),
key_server(cct),
auth_supported(cct),
store(s),
// i'm ready!
messenger->add_dispatcher_tail(this);
- messenger->add_dispatcher_head(&clog);
// start ticker
timer.init();
paxos_service[PAXOS_LOG]->dispatch((PaxosServiceMessage*)m);
break;
+ case MSG_LOGACK:
+ clog.handle_log_ack((MLogAck*)m);
+ break;
+
// monmap
case MSG_MON_JOIN:
paxos_service[PAXOS_MONMAP]->dispatch((PaxosServiceMessage*)m);
logger(NULL),
store(NULL),
map_in_progress(false),
- clog(external_messenger->cct, client_messenger, &mc->monmap, mc, LogClient::NO_FLAGS),
+ clog(external_messenger->cct, client_messenger, &mc->monmap, LogClient::NO_FLAGS),
whoami(id),
dev_path(dev), journal_path(jdev),
dispatch_running(false),
// i'm ready!
client_messenger->add_dispatcher_head(this);
- client_messenger->add_dispatcher_head(&clog);
cluster_messenger->add_dispatcher_head(this);
hbclient_messenger->add_dispatcher_head(&heartbeat_dispatcher);
#include "heap_profiler.h"
#include "common/environment.h"
#include "common/LogClient.h"
+#include "global/global_context.h"
+#include "common/debug.h"
bool ceph_using_tcmalloc()
{