]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: MonitorDBStore: add store iterators to obtain chunks for sync
authorJoao Eduardo Luis <joao.luis@inktank.com>
Wed, 15 Aug 2012 14:35:39 +0000 (15:35 +0100)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 21 Feb 2013 18:02:22 +0000 (18:02 +0000)
We created an interface specific to the MonitorDBStore, which can be used
to create iterators to obtain chunks for sync.

Two different iterators were defined: one that will iterate over the whole
store, focusing on the specified set of prefixes; another that will
iterate over only one specific prefix.

These two different iterators allow us build the sync process in two
distinct phases: 1) obtain all key/value pairs for paxos and all paxos
services, bundle them in chunks and send them over the wire; and 2) obtain
all the paxos versions, bundle them in chunks and send them over the wire.

Also, we are currently considering a chunk to be (at most) 1 MB worth of
data, although it can be tuned using 'mon_sync_max_payload_size' option.

mon: MonitorDBStore: add crc support when --mon-sync-debug is set

Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
src/common/config_opts.h
src/mon/MonitorDBStore.h

index 7686983af886f3f42f69e0f3091b74b87d2b5ed4..04298fb45d8d693c812d7d4954b629d3b5a7a1fa 100644 (file)
@@ -155,6 +155,8 @@ OPTION(mon_slurp_bytes, OPT_INT, 256*1024)    // limit size of slurp messages
 OPTION(mon_client_bytes, OPT_U64, 100ul << 20)  // client msg data allowed in memory (in bytes)
 OPTION(mon_daemon_bytes, OPT_U64, 400ul << 20)  // mds, osd message memory cap (in bytes)
 OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
+OPTION(mon_sync_max_payload_size, OPT_U32, 1048576) // max size for a sync chunk payload (say, 1MB)
+OPTION(mon_sync_debug, OPT_BOOL, false) // enable sync-specific debug
 OPTION(paxos_max_join_drift, OPT_INT, 10)       // max paxos iterations before we must first slurp
 OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0)  // gather updates for this long before proposing a map update
 OPTION(paxos_min_wait, OPT_DOUBLE, 0.05)  // min time to gather updates for after period of inactivity
index 911281bc717842af3cbf850bdf2333bd8f3b626d..8463b50a353d0babfd3c68d54bc403683a05d5a4 100644 (file)
@@ -187,6 +187,177 @@ class MonitorDBStore
     return db->submit_transaction_sync(dbt);
   }
 
+  class StoreIteratorImpl {
+  protected:
+    bool done;
+    pair<string,string> last_key;
+    bufferlist crc_bl;
+
+    StoreIteratorImpl() : done(false) { }
+    virtual ~StoreIteratorImpl() { }
+
+    bool add_chunk_entry(Transaction &tx,
+                        string &prefix,
+                        string &key,
+                        bufferlist &value) {
+      Transaction tmp;
+      bufferlist tmp_bl;
+      tmp.put(prefix, key, value);
+      tmp.encode(tmp_bl);
+
+      bufferlist tx_bl;
+      tx.encode(tx_bl);
+
+      size_t len = tx_bl.length() + tmp_bl.length();
+
+      if (!tx.empty() && (len > g_conf->mon_sync_max_payload_size)) {
+       return false;
+      }
+
+      tx.append(tmp);
+      last_key.first = prefix;
+      last_key.second = key;
+
+      if (g_conf->mon_sync_debug) {
+       ::encode(prefix, crc_bl);
+       ::encode(key, crc_bl);
+       ::encode(value, crc_bl);
+      }
+
+      return true;
+    }
+
+    virtual void _get_chunk(Transaction &tx) = 0;
+    virtual bool _is_valid() = 0;
+
+  public:
+    __u32 crc() {
+      if (g_conf->mon_sync_debug)
+       return crc_bl.crc32c(0);
+      return 0;
+    }
+    pair<string,string> get_last_key() {
+      return last_key;
+    };
+    virtual bool has_next_chunk() {
+      return !done && _is_valid();
+    }
+    virtual void get_chunk(bufferlist &bl) {
+      Transaction tx;
+      _get_chunk(tx);
+      if (!tx.empty())
+       tx.encode(bl);
+    }
+  };
+  typedef std::tr1::shared_ptr<StoreIteratorImpl> Synchronizer;
+
+  class WholeStoreIteratorImpl : public StoreIteratorImpl {
+    KeyValueDB::WholeSpaceIterator iter;
+    set<string> sync_prefixes;
+
+  public:
+    WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
+                          set<string> &prefixes)
+      : StoreIteratorImpl(),
+       iter(iter),
+       sync_prefixes(prefixes)
+    { }
+
+    virtual ~WholeStoreIteratorImpl() { }
+
+    /**
+     * Obtain a chunk of the store
+     *
+     * @param bl           Encoded transaction that will recreate the chunk
+     * @param first_key            Pair containing the first key to obtain, and that
+     *                     will contain the first key in the chunk (that may
+     *                     differ from the one passed on to the function)
+     * @param last_key[out] Last key in the chunk
+     */
+    virtual void _get_chunk(Transaction &tx) {
+      assert(done == false);
+      assert(iter->valid() == true);
+
+      while (iter->valid()) {
+       string prefix(iter->raw_key().first);
+       string key(iter->raw_key().second);
+       if (sync_prefixes.count(prefix)) {
+         bufferlist value = iter->value();
+         if (!add_chunk_entry(tx, prefix, key, value))
+           return;
+       }
+       iter->next();
+      }
+      assert(iter->valid() == false);
+      done = true;
+    }
+
+    virtual bool _is_valid() {
+      return iter->valid();
+    }
+  };
+
+  class SinglePrefixStoreIteratorImpl : public StoreIteratorImpl {
+    KeyValueDB::Iterator iter;
+    string prefix;
+
+  public:
+    SinglePrefixStoreIteratorImpl(KeyValueDB::Iterator iter, string prefix)
+      : StoreIteratorImpl(),
+       iter(iter),
+       prefix(prefix)
+    { }
+
+    virtual ~SinglePrefixStoreIteratorImpl() { }
+
+  private:
+    virtual void _get_chunk(Transaction &tx) {
+      assert(done == false);
+      assert(iter->valid() == true);
+
+      while (iter->valid()) {
+       string key(iter->key());
+       bufferlist value = iter->value();
+       if (!add_chunk_entry(tx, prefix, key, value))
+         return;
+       iter->next();
+      }
+      assert(iter->valid() == false);
+      done = true;
+    }
+
+    virtual bool _is_valid() {
+      return iter->valid();
+    }
+  };
+
+  Synchronizer get_synchronizer(pair<string,string> &key,
+                               set<string> &prefixes) {
+    KeyValueDB::WholeSpaceIterator iter;
+    iter = db->get_snapshot_iterator();
+
+    if (!key.first.empty() && !key.second.empty())
+      iter->upper_bound(key.first, key.second);
+    else
+      iter->seek_to_first();
+
+    return std::tr1::shared_ptr<StoreIteratorImpl>(
+       new WholeStoreIteratorImpl(iter, prefixes)
+    );
+  }
+
+  Synchronizer get_synchronizer(string &prefix) {
+    assert(!prefix.empty());
+
+    KeyValueDB::Iterator iter;
+    iter = db->get_snapshot_iterator(prefix);
+    iter->seek_to_first();
+
+    return std::tr1::shared_ptr<StoreIteratorImpl>(
+       new SinglePrefixStoreIteratorImpl(iter, prefix)
+    );
+  }
+
   int get(const string& prefix, const string& key, bufferlist& bl) {
     set<string> k;
     k.insert(key);