OPTION(mon_client_bytes, OPT_U64, 100ul << 20) // client msg data allowed in memory (in bytes)
OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20) // mds, osd message memory cap (in bytes)
OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
+OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
+OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
return db->submit_transaction_sync(dbt);
}
+ class StoreIteratorImpl {
+ protected:
+ bool done;
+ pair<string,string> last_key;
+ bufferlist crc_bl;
+
+ StoreIteratorImpl() : done(false) { }
+ virtual ~StoreIteratorImpl() { }
+
+ bool add_chunk_entry(Transaction &tx,
+ string &prefix,
+ string &key,
+ bufferlist &value) {
+ Transaction tmp;
+ bufferlist tmp_bl;
+ tmp.put(prefix, key, value);
+ tmp.encode(tmp_bl);
+
+ bufferlist tx_bl;
+ tx.encode(tx_bl);
+
+ size_t len = tx_bl.length() + tmp_bl.length();
+
+ if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) {
+ return false;
+ }
+
+ tx.append(tmp);
+ last_key.first = prefix;
+ last_key.second = key;
+
+ if (g_conf->mon_sync_debug) {
+ ::encode(prefix, crc_bl);
+ ::encode(key, crc_bl);
+ ::encode(value, crc_bl);
+ }
+
+ return true;
+ }
+
+ virtual void _get_chunk(Transaction &tx) = 0;
+ virtual bool _is_valid() = 0;
+
+ public:
+ __u32 crc() {
+ if (g_conf->mon_sync_debug)
+ return crc_bl.crc32c(0);
+ return 0;
+ }
+ pair<string,string> get_last_key() {
+ return last_key;
+ };
+ virtual bool has_next_chunk() {
+ return !done && _is_valid();
+ }
+ virtual void get_chunk(bufferlist &bl) {
+ Transaction tx;
+ _get_chunk(tx);
+ if (!tx.empty())
+ tx.encode(bl);
+ }
+ };
+ typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
+
+ class WholeStoreIteratorImpl : public StoreIteratorImpl {
+ KeyValueDB::WholeSpaceIterator iter;
+ set<string> sync_prefixes;
+
+ public:
+ WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
+ set<string> &prefixes)
+ : StoreIteratorImpl(),
+ iter(iter),
+ sync_prefixes(prefixes)
+ { }
+
+ virtual ~WholeStoreIteratorImpl() { }
+
+ /**
+ * Obtain a chunk of the store
+ *
+ * @param bl Encoded transaction that will recreate the chunk
+ * @param first_key Pair containing the first key to obtain, and that
+ * will contain the first key in the chunk (that may
+ * differ from the one passed on to the function)
+ * @param last_key[out] Last key in the chunk
+ */
+ virtual void _get_chunk(Transaction &tx) {
+ assert(done == false);
+ assert(iter->valid() == true);
+
+ while (iter->valid()) {
+ string prefix(iter->raw_key().first);
+ string key(iter->raw_key().second);
+ if (sync_prefixes.count(prefix)) {
+ bufferlist value = iter->value();
+ if (!add_chunk_entry(tx, prefix, key, value))
+ return;
+ }
+ iter->next();
+ }
+ assert(iter->valid() == false);
+ done = true;
+ }
+
+ virtual bool _is_valid() {
+ return iter->valid();
+ }
+ };
+
+ class SinglePrefixStoreIteratorImpl : public StoreIteratorImpl {
+ KeyValueDB::Iterator iter;
+ string prefix;
+
+ public:
+ SinglePrefixStoreIteratorImpl(KeyValueDB::Iterator iter, string prefix)
+ : StoreIteratorImpl(),
+ iter(iter),
+ prefix(prefix)
+ { }
+
+ virtual ~SinglePrefixStoreIteratorImpl() { }
+
+ private:
+ virtual void _get_chunk(Transaction &tx) {
+ assert(done == false);
+ assert(iter->valid() == true);
+
+ while (iter->valid()) {
+ string key(iter->key());
+ bufferlist value = iter->value();
+ if (!add_chunk_entry(tx, prefix, key, value))
+ return;
+ iter->next();
+ }
+ assert(iter->valid() == false);
+ done = true;
+ }
+
+ virtual bool _is_valid() {
+ return iter->valid();
+ }
+ };
+
+ Synchronizer get_synchronizer(pair<string,string> &key,
+ set<string> &prefixes) {
+ KeyValueDB::WholeSpaceIterator iter;
+ iter = db->get_snapshot_iterator();
+
+ if (!key.first.empty() && !key.second.empty())
+ iter->upper_bound(key.first, key.second);
+ else
+ iter->seek_to_first();
+
+ return std::tr1::shared_ptr<StoreIteratorImpl>(
+ new WholeStoreIteratorImpl(iter, prefixes)
+ );
+ }
+
+ Synchronizer get_synchronizer(string &prefix) {
+ assert(!prefix.empty());
+
+ KeyValueDB::Iterator iter;
+ iter = db->get_snapshot_iterator(prefix);
+ iter->seek_to_first();
+
+ return std::tr1::shared_ptr<StoreIteratorImpl>(
+ new SinglePrefixStoreIteratorImpl(iter, prefix)
+ );
+ }
+
int get(const string& prefix, const string& key, bufferlist& bl) {
set<string> k;
k.insert(key);