class MMonScrub : public Message
{
- static const int HEAD_VERSION = 1;
+ static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
public:
op_type_t op;
version_t version;
ScrubResult result;
+ int32_t num_keys;
+ pair<string,string> key;
MMonScrub()
- : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION)
+ : Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+ num_keys(-1)
{ }
- MMonScrub(op_type_t op, version_t v)
+ MMonScrub(op_type_t op, version_t v, int32_t num_keys)
: Message(MSG_MON_SCRUB, HEAD_VERSION, COMPAT_VERSION),
- op(op), version(v)
+ op(op), version(v), num_keys(num_keys)
{ }
const char *get_type_name() const { return "mon_scrub"; }
out << " v " << version;
if (op == OP_RESULT)
out << " " << result;
+ out << " num_keys " << num_keys;
+ out << " key (" << key << ")";
out << ")";
}
::encode(o, payload);
::encode(version, payload);
::encode(result, payload);
+ ::encode(num_keys, payload);
+ ::encode(key, payload);
}
void decode_payload() {
op = (op_type_t)o;
::decode(version, p);
::decode(result, p);
+ if (header.version >= 2) {
+ ::decode(num_keys, p);
+ ::decode(key, p);
+ }
}
};
if (prefix == "scrub") {
wait_for_paxos_write();
if (is_leader()) {
- int r = scrub();
+ int r = scrub_start();
reply_command(m, r, "", rdata, 0);
} else if (is_peon()) {
forward_request_leader(m);
// ----------------------------------------------
// scrub
-int Monitor::scrub()
+int Monitor::scrub_start()
{
dout(10) << __func__ << dendl;
assert(is_leader());
scrub_result.clear();
scrub_version = paxos->get_version();
+ scrub_state.reset(new ScrubState);
+
+ scrub();
+ return 0;
+}
+
+int Monitor::scrub()
+{
+ assert(is_leader());
+ assert(scrub_state);
+
+ // scrub all keys if we're the only monitor in the quorum
+ int32_t num_keys =
+ (quorum.size() == 1 ? -1 : cct->_conf->mon_scrub_max_keys);
for (set<int>::iterator p = quorum.begin();
p != quorum.end();
++p) {
if (*p == rank)
continue;
- MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version);
+ MMonScrub *r = new MMonScrub(MMonScrub::OP_SCRUB, scrub_version,
+ num_keys);
+ r->key = scrub_state->last_key;
messenger->send_message(r, monmap->get_inst(*p));
}
- scrub_state.reset(new ScrubState);
-
// scrub my keys
- pair<string,string> start;
- bool r = _scrub(&scrub_result[rank], start, cct->_conf->mon_scrub_max_keys);
- assert(!r);
+ bool r = _scrub(&scrub_result[rank],
+ &scrub_state->last_key,
+ &num_keys);
- scrub_state.reset();
+ scrub_state->finished = !r;
- if (scrub_result.size() == quorum.size())
+ if (quorum.size() == 1) {
+ assert(scrub_state->finished == true);
scrub_finish();
-
+ }
return 0;
}
break;
if (m->version != paxos->get_version())
break;
- MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version);
- pair<string,string> start;
- _scrub(&reply->result, start, cct->_conf->mon_scrub_max_keys);
+
+ MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT,
+ m->version,
+ m->num_keys);
+
+ reply->key = m->key;
+ _scrub(&reply->result, &reply->key, &reply->num_keys);
m->get_connection()->send_message(reply);
}
break;
assert(scrub_result.count(from) == 0);
scrub_result[from] = m->result;
- if (scrub_result.size() == quorum.size())
- scrub_finish();
+ if (scrub_result.size() == quorum.size()) {
+ scrub_check_results();
+ scrub_result.clear();
+ if (scrub_state->finished)
+ scrub_finish();
+ else
+ scrub();
+ }
}
break;
}
}
bool Monitor::_scrub(ScrubResult *r,
- pair<string,string> &start,
- int num_keys)
+ pair<string,string> *start,
+ int *num_keys)
{
+ assert(r != NULL);
+ assert(start != NULL);
+ assert(num_keys != NULL);
+
set<string> prefixes = get_sync_targets_names();
prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
- dout(10) << __func__ << " prefixes " << prefixes << dendl;
+ dout(10) << __func__ << " start (" << *start << ")"
+ << " num_keys " << *num_keys << dendl;
- MonitorDBStore::Synchronizer it = store->get_synchronizer(start, prefixes);
+ MonitorDBStore::Synchronizer it = store->get_synchronizer(*start, prefixes);
int scrubbed_keys = 0;
+ pair<string,string> last_key;
while (it->has_next_chunk()) {
- if (num_keys > 0 && scrubbed_keys == num_keys)
+ if (*num_keys > 0 && scrubbed_keys == *num_keys)
break;
pair<string,string> k = it->get_next_key();
+ if (prefixes.count(k.first) == 0)
+ continue;
+
bufferlist bl;
store->get(k.first, k.second, bl);
- dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes crc " << bl.crc32c(0) << dendl;
+ uint32_t key_crc = bl.crc32c(0);
+ dout(30) << __func__ << " " << k << " bl " << bl.length() << " bytes"
+ << " crc " << key_crc << dendl;
r->prefix_keys[k.first]++;
if (r->prefix_crc.count(k.first) == 0)
r->prefix_crc[k.first] = 0;
r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
+
+ ++scrubbed_keys;
+ last_key = k;
}
- if (scrub_state) // leader
- scrub_state->last_key = it->get_last_key();
+ dout(20) << __func__ << " last_key (" << last_key << ")"
+ << " scrubbed_keys " << scrubbed_keys
+ << " has_next " << it->has_next_chunk() << dendl;
+
+ *start = last_key;
+ *num_keys = scrubbed_keys;
return it->has_next_chunk();
}
-void Monitor::scrub_finish()
+void Monitor::scrub_check_results()
{
dout(10) << __func__ << dendl;
}
if (!errors)
clog->info() << "scrub ok on " << quorum << ": " << mine << "\n";
+}
+void Monitor::scrub_finish()
+{
+ dout(10) << __func__ << dendl;
scrub_reset();
scrub_event_start();
}