messages/MMonMap.h\
messages/MMonPaxos.h\
messages/MMonProbe.h\
+ messages/MMonScrub.h \
messages/MMonSubscribe.h\
messages/MMonSubscribeAck.h\
messages/MMonSync.h \
#define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30)
#define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31)
#define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
+#define CEPH_FEATURE_MON_SCRUB (1ULL<<33)
/*
* Features supported. Should be everything above.
CEPH_FEATURE_MDSENC | \
CEPH_FEATURE_OSDHASHPSPOOL | \
CEPH_FEATURE_MON_SINGLE_PAXOS | \
- CEPH_FEATURE_OSD_SNAPMAPPER)
+ CEPH_FEATURE_OSD_SNAPMAPPER | \
+ CEPH_FEATURE_MON_SCRUB | \
+ 0ULL)
#define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+* Ceph - scalable distributed file system
+*
+* Copyright (C) 2013 Inktank, Inc.
+*
+* This is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License version 2.1, as published by the Free Software
+* Foundation. See file COPYING.
+*/
+#ifndef CEPH_MMONSCRUB_H
+#define CEPH_MMONSCRUB_H
+
+#include "msg/Message.h"
+#include "mon/mon_types.h"
+
+class MMonScrub : public Message
+{
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+public:
+ typedef enum {
+ OP_SCRUB = 1, // leader->peon: scrub (a range of) keys
+ OP_RESULT = 2, // peon->leader: result of a scrub
+ } op_type_t;
+
+ static const char *get_opname(op_type_t op) {
+ switch (op) {
+ case OP_SCRUB: return "scrub";
+ case OP_RESULT: return "result";
+ default: assert("unknown op type"); return NULL;
+ }
+ }
+
+ op_type_t op;
+ version_t version;
+ ScrubResult result;
+
+ MMonScrub()
+ : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION)
+ { }
+
+ MMonScrub(op_type_t op, version_t v)
+ : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+ op(op), version(v)
+ { }
+
+ const char *get_type_name() const { return "mon_scrub"; }
+
+ void print(ostream& out) const {
+ out << "mon_scrub(" << get_opname((op_type_t)op);
+ out << " v " << version;
+ if (op == OP_RESULT)
+ out << " " << result;
+ out << ")";
+ }
+
+ void encode_payload(uint64_t features) {
+ uint8_t o = op;
+ ::encode(o, payload);
+ ::encode(version, payload);
+ ::encode(result, payload);
+ }
+
+ void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ uint8_t o;
+ ::decode(o, p);
+ op = (op_type_t)o;
+ ::decode(version, p);
+ ::decode(result, p);
+ }
+};
+
+#endif /* CEPH_MMONSCRUB_H */
#include "messages/MMonCommand.h"
#include "messages/MMonCommandAck.h"
#include "messages/MMonSync.h"
+#include "messages/MMonScrub.h"
#include "messages/MMonProbe.h"
#include "messages/MMonJoin.h"
#include "messages/MMonPaxos.h"
quorum.clear();
outside_quorum.clear();
+ scrub_reset();
+
paxos->restart();
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
reply_command(m, 0, ss.str(), rdata, 0);
return;
}
+ if (m->cmd[0] == "scrub") {
+ if (is_leader()) {
+ int r = scrub();
+ reply_command(m, r, "", rdata, 0);
+ } else if (is_peon()) {
+ forward_request_leader(m);
+ } else {
+ reply_command(m, -EAGAIN, "no quorum", rdata, 0);
+ }
+ return;
+ }
if (m->cmd[0] == "log") {
if (!access_r) {
r = -EACCES;
handle_sync(static_cast<MMonSync*>(m));
break;
+ case MSG_MON_SCRUB:
+ handle_scrub(static_cast<MMonScrub*>(m));
+ break;
+
// OSDs
case MSG_OSD_MARK_ME_DOWN:
case MSG_OSD_FAILURE:
+// ----------------------------------------------
+// scrub
+
+int Monitor::scrub()
+{
+ dout(10) << __func__ << dendl;
+ assert(is_leader());
+
+ if ((get_quorum_features() & CEPH_FEATURE_MON_SCRUB) == 0) {
+ clog.warn() << "scrub not supported by entire quorum\n";
+ return -EOPNOTSUPP;
+ }
+
+ scrub_result.clear();
+ scrub_version = paxos->get_version();
+
+ for (set<int>::iterator p = quorum.begin();
+ p != quorum.end();
+ ++p) {
+ if (*p == rank)
+ continue;
+ MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version);
+ messenger->send_message(r, monmap->get_inst(*p));
+ }
+
+ // scrub my keys
+ _scrub(&scrub_result[rank]);
+
+ if (scrub_result.size() == quorum.size())
+ scrub_finish();
+
+ return 0;
+}
+
+void Monitor::handle_scrub(MMonScrub *m)
+{
+ dout(10) << __func__ << " " << *m << dendl;
+ switch (m->op) {
+ case MMonScrub::OP_SCRUB:
+ {
+ if (!is_peon())
+ break;
+ if (m->version != paxos->get_version())
+ break;
+ MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version);
+ _scrub(&reply->result);
+ messenger->send_message(reply, m->get_connection());
+ }
+ break;
+ case MMonScrub::OP_RESULT:
+ {
+ if (!is_leader())
+ break;
+ if (m->version != scrub_version)
+ break;
+ int from = m->get_source().num();
+ assert(scrub_result.count(from) == 0);
+ scrub_result[from] = m->result;
+
+ if (scrub_result.size() == quorum.size())
+ scrub_finish();
+ }
+ break;
+ }
+ m->put();
+}
+
+void Monitor::_scrub(ScrubResult *r)
+{
+ set<string> prefixes = get_sync_targets_names();
+ prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
+
+ dout(10) << __func__ << " prefixes " << prefixes << dendl;
+
+ pair<string,string> start;
+ MonitorDBStore::Synchronizer synchronizer = store->get_synchronizer(start, prefixes);
+
+ while (synchronizer->has_next_chunk()) {
+ pair<string,string> k = synchronizer->get_next_key();
+ bufferlist bl;
+ store->get(k.first, k.second, bl);
+ dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes crc " << bl.crc32c(0) << dendl;
+ r->prefix_keys[k.first]++;
+ if (r->prefix_crc.count(k.first) == 0)
+ r->prefix_crc[k.first] = 0;
+ r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
+ }
+}
+
+void Monitor::scrub_finish()
+{
+ dout(10) << __func__ << dendl;
+
+ // compare
+ int errors = 0;
+ ScrubResult& mine = scrub_result[rank];
+ for (map<int,ScrubResult>::iterator p = scrub_result.begin();
+ p != scrub_result.end();
+ ++p) {
+ if (p->first == rank)
+ continue;
+ if (p->second != mine) {
+ ++errors;
+ clog.error() << "scrub mismatch" << "\n";
+ clog.error() << " mon." << rank << " " << mine << "\n";
+ clog.error() << " mon." << p->first << " " << p->second << "\n";
+ }
+ }
+ if (!errors)
+ clog.info() << "scrub ok on " << quorum << ": " << mine << "\n";
+
+ scrub_reset();
+}
+
+void Monitor::scrub_reset()
+{
+ dout(10) << __func__ << dendl;
+ scrub_version = 0;
+ scrub_result.clear();
+}
class MMonGetMap;
class MMonGetVersion;
class MMonSync;
+class MMonScrub;
class MMonProbe;
class MMonSubscribe;
class MAuthRotating;
set<string> outside_quorum;
+ /**
+ * @defgroup scrub
+ * @{
+ */
+ version_t scrub_version; ///< paxos version we are scrubbing
+ map<int,ScrubResult> scrub_result; ///< results so far
+
+ /**
+ * trigger a cross-mon scrub
+ *
+ * Verify all mons are storing identical content
+ */
+ int scrub();
+ void handle_scrub(MMonScrub *m);
+ void _scrub(ScrubResult *r);
+ void scrub_finish();
+ void scrub_reset();
+
/**
* @defgroup Synchronization
* @{
void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
- /**
- * Handle Synchronization-related messages.
- */
+
void handle_probe(MMonProbe *m);
/**
* Handle a Probe Operation, replying with our name, quorum and known versions.
if (!tx.empty())
tx.encode(bl);
}
+ virtual pair<string,string> get_next_key() = 0;
};
typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
done = true;
}
+ virtual pair<string,string> get_next_key() {
+ assert(iter->valid());
+ pair<string,string> r = iter->raw_key();
+ do {
+ iter->next();
+ } while (iter->valid() && sync_prefixes.count(iter->raw_key().first) == 0);
+ return r;
+ }
+
virtual bool _is_valid() {
return iter->valid();
}
done = true;
}
+ virtual pair<string,string> get_next_key() {
+ // this method is only used by scrub on the whole store
+ // iterator. also, the single prefix iterator has been dropped
+ // in later code. we leave this here only for the benefit of
+ // backporting.
+ assert(0 == "this should not get called");
+ return make_pair(string(), string());
+ }
+
virtual bool _is_valid() {
return iter->valid();
}
#define CEPH_MON_TYPES_H
#include "include/utime.h"
+#include "common/Formatter.h"
#define PAXOS_PGMAP 0 // before osd, for pg kick to behave
#define PAXOS_MDSMAP 1
WRITE_CLASS_ENCODER(DataStats);
+struct ScrubResult {
+ map<string,uint32_t> prefix_crc; ///< prefix -> crc
+ map<string,uint64_t> prefix_keys; ///< prefix -> key count
+
+ bool operator!=(const ScrubResult& other) {
+ return prefix_crc != other.prefix_crc || prefix_keys != other.prefix_keys;
+ }
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(prefix_crc, bl);
+ ::encode(prefix_keys, bl);
+ ENCODE_FINISH(bl);
+ }
+ void decode(bufferlist::iterator& p) {
+ DECODE_START(1, p);
+ ::decode(prefix_crc, p);
+ ::decode(prefix_keys, p);
+ DECODE_FINISH(p);
+ }
+ void dump(Formatter *f) const {
+ f->open_object_section("crc");
+ for (map<string,uint32_t>::const_iterator p = prefix_crc.begin(); p != prefix_crc.end(); ++p)
+ f->dump_unsigned(p->first.c_str(), p->second);
+ f->close_section();
+ f->open_object_section("keys");
+ for (map<string,uint64_t>::const_iterator p = prefix_keys.begin(); p != prefix_keys.end(); ++p)
+ f->dump_unsigned(p->first.c_str(), p->second);
+ f->close_section();
+ }
+ static void generate_test_instances(list<ScrubResult*>& ls) {
+ ls.push_back(new ScrubResult);
+ ls.push_back(new ScrubResult);
+ ls.back()->prefix_crc["foo"] = 123;
+ ls.back()->prefix_keys["bar"] = 456;
+ }
+};
+WRITE_CLASS_ENCODER(ScrubResult);
+
+static inline ostream& operator<<(ostream& out, const ScrubResult& r) {
+ return out << "ScrubResult(keys " << r.prefix_keys << " crc " << r.prefix_crc << ")";
+}
+
#endif
#include "messages/MMonJoin.h"
#include "messages/MMonElection.h"
#include "messages/MMonSync.h"
+#include "messages/MMonScrub.h"
#include "messages/MLog.h"
#include "messages/MLogAck.h"
case MSG_MON_SYNC:
m = new MMonSync;
break;
+ case MSG_MON_SCRUB:
+ m = new MMonScrub;
+ break;
case MSG_LOG:
m = new MLog;
#include "common/config.h"
// monitor internal
+#define MSG_MON_SCRUB 64
#define MSG_MON_ELECTION 65
#define MSG_MON_PAXOS 66
#define MSG_MON_PROBE 67