]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: implement simple 'scrub' command
authorSage Weil <sage@inktank.com>
Mon, 8 Jul 2013 22:07:57 +0000 (15:07 -0700)
committerSage Weil <sage@inktank.com>
Mon, 8 Jul 2013 23:11:51 +0000 (16:11 -0700)
Compare all keys within the sync'ed prefixes across members of the quorum
and compare the key counts and CRC for inconsistencies.

Currently this is a one-shot inefficient hammer.  We'll want to make this
work in chunks before it is usable in production environments.

Protect with a feature bit to avoid sending MMonScrub to mons who can't
decode it.

Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Greg Farnum <greg@inktank.com>
(cherry picked from commit a9906641a1dce150203b72682da05651e4d68ff5)

Conflicts:

src/mon/MonCommands.h
src/mon/Monitor.cc

src/Makefile.am
src/include/ceph_features.h
src/messages/MMonScrub.h [new file with mode: 0644]
src/mon/Monitor.cc
src/mon/Monitor.h
src/mon/MonitorDBStore.h
src/mon/mon_types.h
src/msg/Message.cc
src/msg/Message.h

index bbd38e618434c94cd3424958a89c4047cf30b501..de760788958ec249f3db08fefff02d3b06fb6066 100644 (file)
@@ -1878,6 +1878,7 @@ noinst_HEADERS = \
         messages/MMonMap.h\
         messages/MMonPaxos.h\
         messages/MMonProbe.h\
+        messages/MMonScrub.h \
         messages/MMonSubscribe.h\
         messages/MMonSubscribeAck.h\
         messages/MMonSync.h \
index b5a01c5647df4d558be1ddeaee7bc463fa64eda4..ce7f123c6bfe6a514680d537aa4514867075137a 100644 (file)
@@ -37,6 +37,7 @@
 #define CEPH_FEATURE_OSDHASHPSPOOL  (1ULL<<30)
 #define CEPH_FEATURE_MON_SINGLE_PAXOS (1ULL<<31)
 #define CEPH_FEATURE_OSD_SNAPMAPPER (1ULL<<32)
+#define CEPH_FEATURE_MON_SCRUB      (1ULL<<33)
 
 /*
  * Features supported.  Should be everything above.
@@ -74,7 +75,9 @@
         CEPH_FEATURE_MDSENC |                  \
         CEPH_FEATURE_OSDHASHPSPOOL |       \
         CEPH_FEATURE_MON_SINGLE_PAXOS |    \
-   CEPH_FEATURE_OSD_SNAPMAPPER)
+        CEPH_FEATURE_OSD_SNAPMAPPER |      \
+        CEPH_FEATURE_MON_SCRUB |           \
+        0ULL)
 
 #define CEPH_FEATURES_SUPPORTED_DEFAULT  CEPH_FEATURES_ALL
 
diff --git a/src/messages/MMonScrub.h b/src/messages/MMonScrub.h
new file mode 100644 (file)
index 0000000..ab4588f
--- /dev/null
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+* Ceph - scalable distributed file system
+*
+* Copyright (C) 2013 Inktank, Inc.
+*
+* This is free software; you can redistribute it and/or
+* modify it under the terms of the GNU Lesser General Public
+* License version 2.1, as published by the Free Software
+* Foundation. See file COPYING.
+*/
+#ifndef CEPH_MMONSCRUB_H
+#define CEPH_MMONSCRUB_H
+
+#include "msg/Message.h"
+#include "mon/mon_types.h"
+
+class MMonScrub : public Message
+{
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+public:
+  typedef enum {
+    OP_SCRUB = 1,         // leader->peon: scrub (a range of) keys
+    OP_RESULT = 2,        // peon->leader: result of a scrub
+  } op_type_t;
+
+  static const char *get_opname(op_type_t op) {
+    switch (op) {
+    case OP_SCRUB: return "scrub";
+    case OP_RESULT: return "result";
+    default: assert("unknown op type"); return NULL;
+    }
+  }
+
+  op_type_t op;
+  version_t version;
+  ScrubResult result;
+
+  MMonScrub()
+    : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION)
+  { }
+
+  MMonScrub(op_type_t op, version_t v)
+    : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+      op(op), version(v)
+  { }
+
+  const char *get_type_name() const { return "mon_scrub"; }
+
+  void print(ostream& out) const {
+    out << "mon_scrub(" << get_opname((op_type_t)op);
+    out << " v " << version;
+    if (op == OP_RESULT)
+      out << " " << result;
+    out << ")";
+  }
+
+  void encode_payload(uint64_t features) {
+    uint8_t o = op;
+    ::encode(o, payload);
+    ::encode(version, payload);
+    ::encode(result, payload);
+  }
+
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    uint8_t o;
+    ::decode(o, p);
+    op = (op_type_t)o;
+    ::decode(version, p);
+    ::decode(result, p);
+  }
+};
+
+#endif /* CEPH_MMONSCRUB_H */
index 9d436f1e5690595b0a0f32383eae5fa9d0747088..bee72a71a5e4352bf493b861f808439360af4fcd 100644 (file)
@@ -37,6 +37,7 @@
 #include "messages/MMonCommand.h"
 #include "messages/MMonCommandAck.h"
 #include "messages/MMonSync.h"
+#include "messages/MMonScrub.h"
 #include "messages/MMonProbe.h"
 #include "messages/MMonJoin.h"
 #include "messages/MMonPaxos.h"
@@ -720,6 +721,8 @@ void Monitor::reset()
   quorum.clear();
   outside_quorum.clear();
 
+  scrub_reset();
+
   paxos->restart();
 
   for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); ++p)
@@ -2580,6 +2583,17 @@ void Monitor::handle_command(MMonCommand *m)
     reply_command(m, 0, ss.str(), rdata, 0);
     return;
   }
+  if (m->cmd[0] == "scrub") {
+    if (is_leader()) {
+      int r = scrub();
+      reply_command(m, r, "", rdata, 0);
+    } else if (is_peon()) {
+      forward_request_leader(m);
+    } else {
+      reply_command(m, -EAGAIN, "no quorum", rdata, 0);
+    }
+    return;
+  }
   if (m->cmd[0] == "log") {
     if (!access_r) {
       r = -EACCES;
@@ -3260,6 +3274,10 @@ bool Monitor::_ms_dispatch(Message *m)
       handle_sync(static_cast<MMonSync*>(m));
       break;
 
+    case MSG_MON_SCRUB:
+      handle_scrub(static_cast<MMonScrub*>(m));
+      break;
+
       // OSDs
     case MSG_OSD_MARK_ME_DOWN:
     case MSG_OSD_FAILURE:
@@ -3913,7 +3931,127 @@ void Monitor::handle_mon_get_map(MMonGetMap *m)
 
 
 
+// ----------------------------------------------
+// scrub
+
+int Monitor::scrub()
+{
+  dout(10) << __func__ << dendl;
+  assert(is_leader());
+
+  if ((get_quorum_features() & CEPH_FEATURE_MON_SCRUB) == 0) {
+    clog.warn() << "scrub not supported by entire quorum\n";
+    return -EOPNOTSUPP;
+  }
+
+  scrub_result.clear();
+  scrub_version = paxos->get_version();
+
+  for (set<int>::iterator p = quorum.begin();
+       p != quorum.end();
+       ++p) {
+    if (*p == rank)
+      continue;
+    MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version);
+    messenger->send_message(r, monmap->get_inst(*p));
+  }
+
+  // scrub my keys
+  _scrub(&scrub_result[rank]);
+
+  if (scrub_result.size() == quorum.size())
+    scrub_finish();
+
+  return 0;
+}
+
+void Monitor::handle_scrub(MMonScrub *m)
+{
+  dout(10) << __func__ << " " << *m << dendl;
+  switch (m->op) {
+  case MMonScrub::OP_SCRUB:
+    {
+      if (!is_peon())
+       break;
+      if (m->version != paxos->get_version())
+       break;
+      MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version);
+      _scrub(&reply->result);
+      messenger->send_message(reply, m->get_connection());
+    }
+    break;
 
+  case MMonScrub::OP_RESULT:
+    {
+      if (!is_leader())
+       break;
+      if (m->version != scrub_version)
+       break;
+      int from = m->get_source().num();
+      assert(scrub_result.count(from) == 0);
+      scrub_result[from] = m->result;
+
+      if (scrub_result.size() == quorum.size())
+       scrub_finish();
+    }
+    break;
+  }
+  m->put();
+}
+
+void Monitor::_scrub(ScrubResult *r)
+{
+  set<string> prefixes = get_sync_targets_names();
+  prefixes.erase("paxos");  // exclude paxos, as this one may have extra states for proposals, etc.
+
+  dout(10) << __func__ << " prefixes " << prefixes << dendl;
+
+  pair<string,string> start;
+  MonitorDBStore::Synchronizer synchronizer = store->get_synchronizer(start, prefixes);
+
+  while (synchronizer->has_next_chunk()) {
+    pair<string,string> k = synchronizer->get_next_key();
+    bufferlist bl;
+    store->get(k.first, k.second, bl);
+    dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes crc " << bl.crc32c(0) << dendl;
+    r->prefix_keys[k.first]++;
+    if (r->prefix_crc.count(k.first) == 0)
+      r->prefix_crc[k.first] = 0;
+    r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
+  }
+}
+
+void Monitor::scrub_finish()
+{
+  dout(10) << __func__ << dendl;
+
+  // compare
+  int errors = 0;
+  ScrubResult& mine = scrub_result[rank];
+  for (map<int,ScrubResult>::iterator p = scrub_result.begin();
+       p != scrub_result.end();
+       ++p) {
+    if (p->first == rank)
+      continue;
+    if (p->second != mine) {
+      ++errors;
+      clog.error() << "scrub mismatch" << "\n";
+      clog.error() << " mon." << rank << " " << mine << "\n";
+      clog.error() << " mon." << p->first << " " << p->second << "\n";
+    }
+  }
+  if (!errors)
+    clog.info() << "scrub ok on " << quorum << ": " << mine << "\n";
+
+  scrub_reset();
+}
+
+void Monitor::scrub_reset()
+{
+  dout(10) << __func__ << dendl;
+  scrub_version = 0;
+  scrub_result.clear();
+}
 
 
 
index 9e31bebd6d9c617cf8810830c09ebdb25e329ffe..1dd100f6616a18f50b73391e0c10f051b81bfdf5 100644 (file)
@@ -92,6 +92,7 @@ class AdminSocketHook;
 class MMonGetMap;
 class MMonGetVersion;
 class MMonSync;
+class MMonScrub;
 class MMonProbe;
 class MMonSubscribe;
 class MAuthRotating;
@@ -197,6 +198,24 @@ private:
 
   set<string> outside_quorum;
 
+  /**
+   * @defgroup scrub
+   * @{
+   */
+  version_t scrub_version;            ///< paxos version we are scrubbing
+  map<int,ScrubResult> scrub_result;  ///< results so far
+
+  /**
+   * trigger a cross-mon scrub
+   *
+   * Verify all mons are storing identical content
+   */
+  int scrub();
+  void handle_scrub(MMonScrub *m);
+  void _scrub(ScrubResult *r);
+  void scrub_finish();
+  void scrub_reset();
+
   /**
    * @defgroup Synchronization
    * @{
@@ -1283,9 +1302,7 @@ public:
   void reply_command(MMonCommand *m, int rc, const string &rs, version_t version);
   void reply_command(MMonCommand *m, int rc, const string &rs, bufferlist& rdata, version_t version);
 
-  /**
-   * Handle Synchronization-related messages.
-   */
+
   void handle_probe(MMonProbe *m);
   /**
    * Handle a Probe Operation, replying with our name, quorum and known versions.
index a4aadc7161ab0a52572a484baf3a50fd0bf9aa77..c140719981b6f6ab73f3159614f2d7563416aec0 100644 (file)
@@ -298,6 +298,7 @@ class MonitorDBStore
       if (!tx.empty())
        tx.encode(bl);
     }
+    virtual pair<string,string> get_next_key() = 0;
   };
   typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
 
@@ -342,6 +343,15 @@ class MonitorDBStore
       done = true;
     }
 
+    virtual pair<string,string> get_next_key() {
+      assert(iter->valid());
+      pair<string,string> r = iter->raw_key();
+      do {
+       iter->next();
+      } while (iter->valid() && sync_prefixes.count(iter->raw_key().first) == 0);
+      return r;
+    }
+
     virtual bool _is_valid() {
       return iter->valid();
     }
@@ -376,6 +386,15 @@ class MonitorDBStore
       done = true;
     }
 
+    virtual pair<string,string> get_next_key() {
+      // this method is only used by scrub on the whole store
+      // iterator.  also, the single prefix iterator has been dropped
+      // in later code.  we leave this here only for the benefit of
+      // backporting.
+      assert(0 == "this should not get called");
+      return make_pair(string(), string());
+    }
+
     virtual bool _is_valid() {
       return iter->valid();
     }
index 1f4135c08678c89f50becfca3bc61e1c3a18c992..0eae3b172bff070f70be6f7013e0ae78d662a2cc 100644 (file)
@@ -16,6 +16,7 @@
 #define CEPH_MON_TYPES_H
 
 #include "include/utime.h"
+#include "common/Formatter.h"
 
 #define PAXOS_PGMAP      0  // before osd, for pg kick to behave
 #define PAXOS_MDSMAP     1
@@ -71,4 +72,47 @@ struct DataStats {
 
 WRITE_CLASS_ENCODER(DataStats);
 
+struct ScrubResult {
+  map<string,uint32_t> prefix_crc;  ///< prefix -> crc
+  map<string,uint64_t> prefix_keys; ///< prefix -> key count
+
+  bool operator!=(const ScrubResult& other) {
+    return prefix_crc != other.prefix_crc || prefix_keys != other.prefix_keys;
+  }
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(prefix_crc, bl);
+    ::encode(prefix_keys, bl);
+    ENCODE_FINISH(bl);
+  }
+  void decode(bufferlist::iterator& p) {
+    DECODE_START(1, p);
+    ::decode(prefix_crc, p);
+    ::decode(prefix_keys, p);
+    DECODE_FINISH(p);
+  }
+  void dump(Formatter *f) const {
+    f->open_object_section("crc");
+    for (map<string,uint32_t>::const_iterator p = prefix_crc.begin(); p != prefix_crc.end(); ++p)
+      f->dump_unsigned(p->first.c_str(), p->second);
+    f->close_section();
+    f->open_object_section("keys");
+    for (map<string,uint64_t>::const_iterator p = prefix_keys.begin(); p != prefix_keys.end(); ++p)
+      f->dump_unsigned(p->first.c_str(), p->second);
+    f->close_section();
+  }
+  static void generate_test_instances(list<ScrubResult*>& ls) {
+    ls.push_back(new ScrubResult);
+    ls.push_back(new ScrubResult);
+    ls.back()->prefix_crc["foo"] = 123;
+    ls.back()->prefix_keys["bar"] = 456;
+  }
+};
+WRITE_CLASS_ENCODER(ScrubResult);
+
+static inline ostream& operator<<(ostream& out, const ScrubResult& r) {
+  return out << "ScrubResult(keys " << r.prefix_keys << " crc " << r.prefix_crc << ")";
+}
+
 #endif
index 77be03a590be0c59e11bcd59612f537bfbe4f44b..756a072cef836d5ca7874f403fca1674097c988b 100644 (file)
@@ -40,6 +40,7 @@ using namespace std;
 #include "messages/MMonJoin.h"
 #include "messages/MMonElection.h"
 #include "messages/MMonSync.h"
+#include "messages/MMonScrub.h"
 
 #include "messages/MLog.h"
 #include "messages/MLogAck.h"
@@ -316,6 +317,9 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
   case MSG_MON_SYNC:
     m = new MMonSync;
     break;
+  case MSG_MON_SCRUB:
+    m = new MMonScrub;
+    break;
 
   case MSG_LOG:
     m = new MLog;
index 18a64c1d02eb8ebbfd4096a41178f27c721a387d..630d4eaddc5a19d5dda49a0c41167093e237c632 100644 (file)
@@ -33,6 +33,7 @@
 #include "common/config.h"
 
 // monitor internal
+#define MSG_MON_SCRUB              64
 #define MSG_MON_ELECTION           65
 #define MSG_MON_PAXOS              66
 #define MSG_MON_PROBE              67