]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: commit large numbers of state values quickly
authorSage Weil <sage@newdream.net>
Fri, 14 Nov 2008 21:48:30 +0000 (13:48 -0800)
committerSage Weil <sage@newdream.net>
Fri, 14 Nov 2008 21:48:30 +0000 (13:48 -0800)
Write them all, then sync once at the end.

Also include some infrastructure for using the latest stashed value
to recover.  Don't use it yet, though.  The interaction with
keeping last_committed and latest stashed values in sync wrt a
failure between the two is a bit tricky.

src/messages/MMonPaxos.h
src/mon/MonitorStore.cc
src/mon/MonitorStore.h
src/mon/Paxos.cc

index ee9b78f1fd9aad7206722625540577d00048e634..4c93aedf0c328911807fda7d1ea5ddd341bb97e3 100644 (file)
@@ -53,6 +53,9 @@ class MMonPaxos : public Message {
   version_t uncommitted_pn;     // previous pn, if we are a LAST with an uncommitted value
   utime_t lease_expire;
 
+  version_t latest_version;
+  bufferlist latest_value;
+
   map<version_t,bufferlist> values;
 
   MMonPaxos() : Message(MSG_MON_PAXOS) {}
@@ -60,7 +63,8 @@ class MMonPaxos : public Message {
     Message(MSG_MON_PAXOS),
     epoch(e),
     op(o), machine_id(mid),
-    first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0) { }
+    first_committed(0), last_committed(0), pn_from(0), pn(0), uncommitted_pn(0),
+    latest_version(0) { }
   
   const char *get_type_name() { return "paxos"; }
   
@@ -69,8 +73,10 @@ class MMonPaxos : public Message {
        << " " << get_opname(op) 
        << " lc " << last_committed
        << " fc " << first_committed
-       << " pn " << pn << " opn " << uncommitted_pn
-       << ")";
+       << " pn " << pn << " opn " << uncommitted_pn;
+    if (latest_version)
+      out << " latest " << latest_version << " (" << latest_value.length() << " bytes)";
+    out << ")";
   }
 
   void encode_payload() {
@@ -83,6 +89,8 @@ class MMonPaxos : public Message {
     ::encode(pn, payload);
     ::encode(uncommitted_pn, payload);
     ::encode(lease_expire, payload);
+    ::encode(latest_version, payload);
+    ::encode(latest_value, payload);
     ::encode(values, payload);
   }
   void decode_payload() {
@@ -96,6 +104,8 @@ class MMonPaxos : public Message {
     ::decode(pn, p);   
     ::decode(uncommitted_pn, p);
     ::decode(lease_expire, p);
+    ::decode(latest_version, p);
+    ::decode(latest_value, p);
     ::decode(values, p);
   }
 };
index 315e02b95cd3af2f6d8311875459422de959291d..063c20a277e952abc7953152656309176185b26d 100644 (file)
@@ -92,6 +92,11 @@ int MonitorStore::mkfs()
   return r;
 }
 
+void MonitorStore::sync()
+{
+  dout(10) << "sync" << dendl;
+  ::sync();
+}
 
 version_t MonitorStore::get_int(const char *a, const char *b)
 {
@@ -120,7 +125,7 @@ version_t MonitorStore::get_int(const char *a, const char *b)
 }
 
 
-void MonitorStore::put_int(version_t val, const char *a, const char *b)
+void MonitorStore::put_int(version_t val, const char *a, const char *b, bool sync)
 {
   char fn[200];
   sprintf(fn, "%s/%s", dir.c_str(), a);
@@ -145,6 +150,8 @@ void MonitorStore::put_int(version_t val, const char *a, const char *b)
   int fd = ::open(tfn, O_WRONLY|O_CREAT, 0644);
   assert(fd >= 0);
   ::write(fd, vs, strlen(vs));
+  if (sync)
+    ::fsync(fd);
   ::close(fd);
   ::rename(tfn, fn);
 }
@@ -231,7 +238,7 @@ int MonitorStore::get_bl_ss(bufferlist& bl, const char *a, const char *b)
   return len;
 }
 
-int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
+int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync)
 {
   char fn[200];
   sprintf(fn, "%s/%s", dir.c_str(), a);
@@ -259,7 +266,8 @@ int MonitorStore::put_bl_ss(bufferlist& bl, const char *a, const char *b)
       derr(0) << "put_bl_ss ::write() errored out, errno is " << strerror(errno) << dendl;
   }
 
-  ::fsync(fd);
+  if (sync)
+    ::fsync(fd);
   ::close(fd);
   ::rename(tfn, fn);
 
index 5f95cc2bdc665debb60082682ccd58545f135a33..01bae2bfe7e91e6427a47bc2d73036dc53f659e1 100644 (file)
@@ -32,15 +32,17 @@ public:
   int mount();
   int umount();
 
+  void sync();
+
   // ints (stored as ascii)
   version_t get_int(const char *a, const char *b=0);
-  void put_int(version_t v, const char *a, const char *b=0);
+  void put_int(version_t v, const char *a, const char *b=0, bool sync=true);
 
   // buffers
   // ss and sn varieties.
   bool exists_bl_ss(const char *a, const char *b=0);
   int get_bl_ss(bufferlist& bl, const char *a, const char *b);
-  int put_bl_ss(bufferlist& bl, const char *a, const char *b);
+  int put_bl_ss(bufferlist& bl, const char *a, const char *b, bool sync=true);
   bool exists_bl_sn(const char *a, version_t b) {
     char bs[20];
 #ifdef __LP64__
@@ -59,14 +61,14 @@ public:
 #endif
     return get_bl_ss(bl, a, bs);
   }
-  int put_bl_sn(bufferlist& bl, const char *a, version_t b) {
+  int put_bl_sn(bufferlist& bl, const char *a, version_t b, bool sync=true) {
     char bs[20];
 #ifdef __LP64__
     sprintf(bs, "%lu", b);
 #else
     sprintf(bs, "%llu", b);
 #endif
-    return put_bl_ss(bl, a, bs);
+    return put_bl_ss(bl, a, bs, sync);
   }
 
   int erase_ss(const char *a, const char *b);
index 6ca1c7a082196f9c706168cda731703862720d04..baa67104ddcb326f982175496ecafaeeebfb9f37 100644 (file)
@@ -142,13 +142,31 @@ void Paxos::handle_collect(MMonPaxos *collect)
   last->pn_from = accepted_pn_from;
 
   // and share whatever data we have
-  for (version_t v = collect->last_committed+1;
-       v <= last_committed;
-       v++) {
-    if (mon->store->exists_bl_sn(machine_name, v)) {
-      mon->store->get_bl_sn(last->values[v], machine_name, v);
-      dout(10) << " sharing " << v << " (" 
-              << last->values[v].length() << " bytes)" << dendl;
+  if (collect->last_committed < last_committed) {
+    bufferlist bl;
+    version_t l = get_latest(bl);
+    assert(l <= last_committed);
+
+    version_t v = collect->last_committed;
+
+    // start with a stashed full copy?
+    /* hmm.
+    if (l > v + 10) {
+      last->latest_value.claim(bl);
+      last->latest_version = l;
+      v = l;
+    }
+    */
+
+    // include (remaining) incrementals
+    for (v++;
+        v <= last_committed;
+        v++) {
+      if (mon->store->exists_bl_sn(machine_name, v)) {
+       mon->store->get_bl_sn(last->values[v], machine_name, v);
+       dout(10) << " sharing " << v << " (" 
+                << last->values[v].length() << " bytes)" << dendl;
+      }
     }
   }
 
@@ -187,13 +205,21 @@ void Paxos::handle_last(MMonPaxos *last)
 
   // did we receive a committed value?
   if (last->last_committed > last_committed) {
+    /* hmm.
+    if (last->latest_version) {
+      last_committed = last->latest_value;
+      dout(10) << "stashing latest full value " << last_committed << dendl;
+      stash_latest(last_committed, last->latest_value);
+    }
+    */
     for (version_t v = last_committed+1;
         v <= last->last_committed;
         v++) {
-      mon->store->put_bl_sn(last->values[v], machine_name, v);
+      mon->store->put_bl_sn(last->values[v], machine_name, v, false);
       dout(10) << "committing " << v << " " 
               << last->values[v].length() << " bytes" << dendl;
     }
+    mon->store->sync();
     last_committed = last->last_committed;
     mon->store->put_int(last_committed, machine_name, "last_committed");
     dout(10) << "last_committed now " << last_committed << dendl;
@@ -465,8 +491,9 @@ void Paxos::handle_commit(MMonPaxos *commit)
     assert(p->first == last_committed+1);
     last_committed = p->first;
     dout(10) << " storing " << last_committed << " (" << p->second.length() << " bytes)" << dendl;
-    mon->store->put_bl_sn(p->second, machine_name, last_committed);
+    mon->store->put_bl_sn(p->second, machine_name, last_committed, false);
   }
+  mon->store->sync();
   mon->store->put_int(last_committed, machine_name, "last_committed");
   
   delete commit;
@@ -878,7 +905,8 @@ version_t Paxos::get_latest(bufferlist& bl)
     return 0;
   }
   bufferlist::iterator p = full.begin();
-  ::decode(latest_stashed, p);
+  version_t v;
+  ::decode(v, p);
   ::decode(bl, p);
   dout(10) << "get_latest v" << latest_stashed << " len " << bl.length() << dendl;
   return latest_stashed;