From a9906641a1dce150203b72682da05651e4d68ff5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 8 Jul 2013 15:07:57 -0700 Subject: [PATCH] mon: implement simple 'scrub' command Compare all keys within the sync'ed prefixes across members of the quorum and compare the key counts and CRC for inconsistencies. Currently this is a one-shot inefficient hammer. We'll want to make this work in chunks before it is usable in production environments. Protect with a feature bit to avoid sending MMonScrub to mons who can't decode it. Signed-off-by: Sage Weil Reviewed-by: Greg Farnum --- src/Makefile.am | 1 + src/include/ceph_features.h | 5 +- src/messages/MMonScrub.h | 78 ++++++++++++++++++++ src/mon/MonCommands.h | 1 + src/mon/Monitor.cc | 139 ++++++++++++++++++++++++++++++++++++ src/mon/Monitor.h | 23 +++++- src/mon/MonitorDBStore.h | 19 +++++ src/mon/mon_types.h | 44 ++++++++++++ src/msg/Message.cc | 4 ++ src/msg/Message.h | 1 + 10 files changed, 311 insertions(+), 4 deletions(-) create mode 100644 src/messages/MMonScrub.h diff --git a/src/Makefile.am b/src/Makefile.am index c1a7e8020ef3d..35608dc9f4e3d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1937,6 +1937,7 @@ noinst_HEADERS = \ messages/MMonMap.h\ messages/MMonPaxos.h\ messages/MMonProbe.h\ + messages/MMonScrub.h \ messages/MMonSubscribe.h\ messages/MMonSubscribeAck.h\ messages/MMonSync.h \ diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index b5a01c5647df4..ce7f123c6bfe6 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -37,6 +37,7 @@ #define CEPH_FEATURE_OSDHASHPSPOOL (1ULL<<30) #define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31) #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32) +#define CEPH_FEATURE_MON_SCRUB (1ULL<<33) /* * Features supported. Should be everything above. @@ -74,7 +75,9 @@ CEPH_FEATURE_MDSENC | \ CEPH_FEATURE_OSDHASHPSPOOL | \ CEPH_FEATURE_MON_SINGLE_PAXOS | \ - CEPH_FEATURE_OSD_SNAPMAPPER) + CEPH_FEATURE_OSD_SNAPMAPPER | \ + CEPH_FEATURE_MON_SCRUB | \ + 0ULL) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/messages/MMonScrub.h b/src/messages/MMonScrub.h new file mode 100644 index 0000000000000..ab4588f4a76e2 --- /dev/null +++ b/src/messages/MMonScrub.h @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* +* Ceph - scalable distributed file system +* +* Copyright (C) 2013 Inktank, Inc. +* +* This is free software; you can redistribute it and/or +* modify it under the terms of the GNU Lesser General Public +* License version 2.1, as published by the Free Software +* Foundation. See file COPYING. +*/ +#ifndef CEPH_MMONSCRUB_H +#define CEPH_MMONSCRUB_H + +#include "msg/Message.h" +#include "mon/mon_types.h" + +class MMonScrub : public Message +{ + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; + +public: + typedef enum { + OP_SCRUB = 1, // leader->peon: scrub (a range of) keys + OP_RESULT = 2, // peon->leader: result of a scrub + } op_type_t; + + static const char *get_opname(op_type_t op) { + switch (op) { + case OP_SCRUB: return "scrub"; + case OP_RESULT: return "result"; + default: assert("unknown op type"); return NULL; + } + } + + op_type_t op; + version_t version; + ScrubResult result; + + MMonScrub() + : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION) + { } + + MMonScrub(op_type_t op, version_t v) + : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION), + op(op), version(v) + { } + + const char *get_type_name() const { return "mon_scrub"; } + + void print(ostream& out) const { + out << "mon_scrub(" << get_opname((op_type_t)op); + out << " v " << version; + if (op == OP_RESULT) + out << " " << result; + out << ")"; + } + + void encode_payload(uint64_t features) { + uint8_t o = op; + ::encode(o, payload); + ::encode(version, payload); + ::encode(result, payload); + } + + void decode_payload() { + bufferlist::iterator p = payload.begin(); + uint8_t o; + ::decode(o, p); + op = (op_type_t)o; + ::decode(version, p); + ::decode(result, p); + } +}; + +#endif /* CEPH_MMONSCRUB_H */ diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 0f5f4096f12ba..93c3801fb0b58 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -162,6 +162,7 @@ COMMAND("auth del " \ * Monitor commands (Monitor.cc) */ COMMAND("compact", "cause compaction of monitor's leveldb storage") +COMMAND("scrub", "scrub the monitor stores") COMMAND("fsid", "show cluster FSID/UUID") COMMAND("log name=logtext,type=CephString,n=N", \ "log supplied text to the monitor log") diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 58a5689f2dbb9..df09ff84799ef 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -38,6 +38,7 @@ #include "messages/MMonCommand.h" #include "messages/MMonCommandAck.h" #include "messages/MMonSync.h" +#include "messages/MMonScrub.h" #include "messages/MMonProbe.h" #include "messages/MMonJoin.h" #include "messages/MMonPaxos.h" @@ -728,6 +729,8 @@ void Monitor::reset() quorum.clear(); outside_quorum.clear(); + scrub_reset(); + paxos->restart(); for (vector::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p) @@ -2660,6 +2663,18 @@ void Monitor::handle_command(MMonCommand *m) return; } + if (prefix == "scrub") { + if (is_leader()) { + int r = scrub(); + reply_command(m, r, "", rdata, 0); + } else if (is_peon()) { + forward_request_leader(m); + } else { + reply_command(m, -EAGAIN, "no quorum", rdata, 0); + } + return; + } + if (prefix == "compact") { if (!access_all) { r = -EACCES; @@ -3319,6 +3334,10 @@ bool Monitor::_ms_dispatch(Message *m) handle_sync(static_cast(m)); break; + case MSG_MON_SCRUB: + handle_scrub(static_cast(m)); + break; + // OSDs case MSG_OSD_MARK_ME_DOWN: case MSG_OSD_FAILURE: @@ -3972,7 +3991,127 @@ void Monitor::handle_mon_get_map(MMonGetMap *m) +// ---------------------------------------------- +// scrub + +int Monitor::scrub() +{ + dout(10) << __func__ << dendl; + assert(is_leader()); + + if ((get_quorum_features() & CEPH_FEATURE_MON_SCRUB) == 0) { + clog.warn() << "scrub not supported by entire quorum\n"; + return -EOPNOTSUPP; + } + + scrub_result.clear(); + scrub_version = paxos->get_version(); + + for (set::iterator p = quorum.begin(); + p != quorum.end(); + ++p) { + if (*p == rank) + continue; + MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version); + messenger->send_message(r, monmap->get_inst(*p)); + } + + // scrub my keys + _scrub(&scrub_result[rank]); + + if (scrub_result.size() == quorum.size()) + scrub_finish(); + + return 0; +} + +void Monitor::handle_scrub(MMonScrub *m) +{ + dout(10) << __func__ << " " << *m << dendl; + switch (m->op) { + case MMonScrub::OP_SCRUB: + { + if (!is_peon()) + break; + if (m->version != paxos->get_version()) + break; + MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version); + _scrub(&reply->result); + messenger->send_message(reply, m->get_connection()); + } + break; + + case MMonScrub::OP_RESULT: + { + if (!is_leader()) + break; + if (m->version != scrub_version) + break; + int from = m->get_source().num(); + assert(scrub_result.count(from) == 0); + scrub_result[from] = m->result; + + if (scrub_result.size() == quorum.size()) + scrub_finish(); + } + break; + } + m->put(); +} + +void Monitor::_scrub(ScrubResult *r) +{ + set prefixes = get_sync_targets_names(); + prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc. + + dout(10) << __func__ << " prefixes " << prefixes << dendl; + + pair start; + MonitorDBStore::Synchronizer synchronizer = store->get_synchronizer(start, prefixes); + while (synchronizer->has_next_chunk()) { + pair k = synchronizer->get_next_key(); + bufferlist bl; + store->get(k.first, k.second, bl); + dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes crc " << bl.crc32c(0) << dendl; + r->prefix_keys[k.first]++; + if (r->prefix_crc.count(k.first) == 0) + r->prefix_crc[k.first] = 0; + r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]); + } +} + +void Monitor::scrub_finish() +{ + dout(10) << __func__ << dendl; + + // compare + int errors = 0; + ScrubResult& mine = scrub_result[rank]; + for (map::iterator p = scrub_result.begin(); + p != scrub_result.end(); + ++p) { + if (p->first == rank) + continue; + if (p->second != mine) { + ++errors; + clog.error() << "scrub mismatch" << "\n"; + clog.error() << " mon." << rank << " " << mine << "\n"; + clog.error() << " mon." << p->first << " " << p->second << "\n"; + } + } + if (!errors) + clog.info() << "scrub ok on " << quorum << ": " << mine << "\n"; + + scrub_reset(); +} + +void Monitor::scrub_reset() +{ + dout(10) << __func__ << dendl; + scrub_version = 0; + scrub_result.clear(); +} diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 76e19d797c20a..004292b97780e 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -93,6 +93,7 @@ class AdminSocketHook; class MMonGetMap; class MMonGetVersion; class MMonSync; +class MMonScrub; class MMonProbe; class MMonSubscribe; class MAuthRotating; @@ -200,6 +201,24 @@ private: set outside_quorum; + /** + * @defgroup scrub + * @{ + */ + version_t scrub_version; ///< paxos version we are scrubbing + map scrub_result; ///< results so far + + /** + * trigger a cross-mon scrub + * + * Verify all mons are storing identical content + */ + int scrub(); + void handle_scrub(MMonScrub *m); + void _scrub(ScrubResult *r); + void scrub_finish(); + void scrub_reset(); + /** * @defgroup Synchronization * @{ @@ -1287,9 +1306,7 @@ public: void reply_command(MMonCommand *m, int rc, const string &rs, version_t version); void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version); - /** - * Handle Synchronization-related messages. - */ + void handle_probe(MMonProbe *m); /** * Handle a Probe Operation, replying with our name, quorum and known versions. diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index 32e6c99cab780..8405dbed2af28 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -300,6 +300,7 @@ class MonitorDBStore if (!tx.empty()) tx.encode(bl); } + virtual pair get_next_key() = 0; }; typedef std::tr1::shared_ptr Synchronizer; @@ -344,6 +345,15 @@ class MonitorDBStore done = true; } + virtual pair get_next_key() { + assert(iter->valid()); + pair r = iter->raw_key(); + do { + iter->next(); + } while (iter->valid() && sync_prefixes.count(iter->raw_key().first) == 0); + return r; + } + virtual bool _is_valid() { return iter->valid(); } @@ -378,6 +388,15 @@ class MonitorDBStore done = true; } + virtual pair get_next_key() { + // this method is only used by scrub on the whole store + // iterator. also, the single prefix iterator has been dropped + // in later code. we leave this here only for the benefit of + // backporting. + assert(0 == "this should not get called"); + return make_pair(string(), string()); + } + virtual bool _is_valid() { return iter->valid(); } diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index 1f4135c08678c..0eae3b172bff0 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -16,6 +16,7 @@ #define CEPH_MON_TYPES_H #include "include/utime.h" +#include "common/Formatter.h" #define PAXOS_PGMAP 0 // before osd, for pg kick to behave #define PAXOS_MDSMAP 1 @@ -71,4 +72,47 @@ struct DataStats { WRITE_CLASS_ENCODER(DataStats); +struct ScrubResult { + map prefix_crc; ///< prefix -> crc + map prefix_keys; ///< prefix -> key count + + bool operator!=(const ScrubResult& other) { + return prefix_crc != other.prefix_crc || prefix_keys != other.prefix_keys; + } + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(prefix_crc, bl); + ::encode(prefix_keys, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::iterator& p) { + DECODE_START(1, p); + ::decode(prefix_crc, p); + ::decode(prefix_keys, p); + DECODE_FINISH(p); + } + void dump(Formatter *f) const { + f->open_object_section("crc"); + for (map::const_iterator p = prefix_crc.begin(); p != prefix_crc.end(); ++p) + f->dump_unsigned(p->first.c_str(), p->second); + f->close_section(); + f->open_object_section("keys"); + for (map::const_iterator p = prefix_keys.begin(); p != prefix_keys.end(); ++p) + f->dump_unsigned(p->first.c_str(), p->second); + f->close_section(); + } + static void generate_test_instances(list& ls) { + ls.push_back(new ScrubResult); + ls.push_back(new ScrubResult); + ls.back()->prefix_crc["foo"] = 123; + ls.back()->prefix_keys["bar"] = 456; + } +}; +WRITE_CLASS_ENCODER(ScrubResult); + +static inline ostream& operator<<(ostream& out, const ScrubResult& r) { + return out << "ScrubResult(keys " << r.prefix_keys << " crc " << r.prefix_crc << ")"; +} + #endif diff --git a/src/msg/Message.cc b/src/msg/Message.cc index a6889d39fdf37..325a54961afa3 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -40,6 +40,7 @@ using namespace std; #include "messages/MMonJoin.h" #include "messages/MMonElection.h" #include "messages/MMonSync.h" +#include "messages/MMonScrub.h" #include "messages/MLog.h" #include "messages/MLogAck.h" @@ -318,6 +319,9 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot case MSG_MON_SYNC: m = new MMonSync; break; + case MSG_MON_SCRUB: + m = new MMonScrub; + break; case MSG_LOG: m = new MLog; diff --git a/src/msg/Message.h b/src/msg/Message.h index 2e3d59b886d7c..9b01af9afb551 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -33,6 +33,7 @@ #include "common/config.h" // monitor internal +#define MSG_MON_SCRUB 64 #define MSG_MON_ELECTION 65 #define MSG_MON_PAXOS 66 #define MSG_MON_PROBE 67 -- 2.39.5