OPTION(mon_data_avail_warn, OPT_INT, 30)
OPTION(mon_data_size_warn, OPT_U64, 15*1024*1024*1024) // issue a warning when the monitor's data store goes over 15GB (in bytes)
OPTION(mon_scrub_interval, OPT_INT, 3600*24) // once a day
+OPTION(mon_scrub_max_keys, OPT_INT, -1) // max number of keys to scrub each time
OPTION(mon_config_key_max_entry_size, OPT_INT, 4096) // max num bytes per config-key entry
OPTION(mon_sync_timeout, OPT_DOUBLE, 60.0)
OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
messenger->send_message(r, monmap->get_inst(*p));
}
+ scrub_state.reset(new ScrubState);
+
// scrub my keys
- _scrub(&scrub_result[rank]);
+ pair<string,string> start;
+ bool r = _scrub(&scrub_result[rank], start, cct->_conf->mon_scrub_max_keys);
+ assert(!r);
+
+ scrub_state.reset();
if (scrub_result.size() == quorum.size())
scrub_finish();
if (m->version != paxos->get_version())
break;
MMonScrub *reply = new MMonScrub(MMonScrub::OP_RESULT, m->version);
- _scrub(&reply->result);
+ pair<string,string> start;
+ _scrub(&reply->result, start, cct->_conf->mon_scrub_max_keys);
m->get_connection()->send_message(reply);
}
break;
m->put();
}
-void Monitor::_scrub(ScrubResult *r)
+bool Monitor::_scrub(ScrubResult *r,
+ pair<string,string> &start,
+ int num_keys)
{
set<string> prefixes = get_sync_targets_names();
prefixes.erase("paxos"); // exclude paxos, as this one may have extra states for proposals, etc.
dout(10) << __func__ << " prefixes " << prefixes << dendl;
- if (scrub_state) {
- dout(10) << __func__ << " scrub already in progress" << dendl;
- return;
- }
+ MonitorDBStore::Synchronizer it = store->get_synchronizer(start, prefixes);
- scrub_state.reset(new ScrubState);
- MonitorDBStore::Synchronizer it =
- store->get_synchronizer(scrub_state->last_key, prefixes);
+ int scrubbed_keys = 0;
while (it->has_next_chunk()) {
+
+ if (num_keys > 0 && scrubbed_keys == num_keys)
+ break;
+
pair<string,string> k = it->get_next_key();
bufferlist bl;
store->get(k.first, k.second, bl);
r->prefix_crc[k.first] = 0;
r->prefix_crc[k.first] = bl.crc32c(r->prefix_crc[k.first]);
}
- scrub_state->last_key = it->get_last_key();
- scrub_state.reset();
+ if (scrub_state) // leader
+ scrub_state->last_key = it->get_last_key();
+
+ return it->has_next_chunk();
}
void Monitor::scrub_finish()