OPTION(mon_config_key_max_entry_size, OPT_INT) // max num bytes per config-key entry
OPTION(mon_sync_timeout, OPT_DOUBLE)
OPTION(mon_sync_max_payload_size, OPT_U32) // max size for a sync chunk payload (say)
+OPTION(mon_sync_max_payload_keys, OPT_INT)
OPTION(mon_sync_debug, OPT_BOOL) // enable sync-specific debug
OPTION(mon_inject_sync_get_chunk_delay, OPT_DOUBLE) // inject N second delay on each get_chunk request
OPTION(mon_osd_force_trim_to, OPT_INT) // force mon to trim maps to this point, regardless of min_last_epoch_clean (dangerous)
.add_service("mon")
.set_description("target max message payload for mon sync"),
+ Option("mon_sync_max_payload_keys", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+ .set_default(2000)
+ .add_service("mon")
+ .set_description("target max keys in message payload for mon sync"),
+
Option("mon_sync_debug", Option::TYPE_BOOL, Option::LEVEL_DEV)
.set_default(false)
.add_service("mon")
MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
auto tx(std::make_shared<MonitorDBStore::Transaction>());
- int left = g_conf()->mon_sync_max_payload_size;
- while (sp.last_committed < paxos->get_version() && left > 0) {
+ int bytes_left = g_conf()->mon_sync_max_payload_size;
+ int keys_left = g_conf()->mon_sync_max_payload_keys;
+ while (sp.last_committed < paxos->get_version() &&
+ bytes_left > 0 &&
+ keys_left > 0) {
bufferlist bl;
sp.last_committed++;
ceph_assert(err == 0);
tx->put(paxos->get_name(), sp.last_committed, bl);
- left -= bl.length();
+ bytes_left -= bl.length();
+ --keys_left;
dout(20) << __func__ << " including paxos state " << sp.last_committed
<< dendl;
}
reply->last_committed = sp.last_committed;
- if (sp.full && left > 0) {
- sp.synchronizer->get_chunk_tx(tx, left);
+ if (sp.full && bytes_left > 0 && keys_left > 0) {
+ sp.synchronizer->get_chunk_tx(tx, bytes_left, keys_left);
sp.last_key = sp.synchronizer->get_last_key();
reply->last_key = sp.last_key;
}
virtual bool has_next_chunk() {
return !done && _is_valid();
}
- virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
+ virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) = 0;
virtual pair<string,string> get_next_key() = 0;
};
typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
* differ from the one passed on to the function)
* @param last_key[out] Last key in the chunk
*/
- void get_chunk_tx(TransactionRef tx, uint64_t max_bytes) override {
+ void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) override {
ceph_assert(done == false);
ceph_assert(iter->valid() == true);
bufferlist value = iter->value();
if (tx->empty() ||
(tx->get_bytes() + value.length() + key.size() +
- prefix.size() < max_bytes)) {
+ prefix.size() < max_bytes &&
+ tx->get_keys() < max_keys)) {
// NOTE: putting every key in a separate transaction is
// questionable as far as efficiency goes
auto tmp(std::make_shared<Transaction>());