]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: add checksums to pg log
authorSage Weil <sage@newdream.net>
Fri, 24 Sep 2010 04:59:05 +0000 (21:59 -0700)
committerSage Weil <sage@newdream.net>
Fri, 24 Sep 2010 04:36:18 +0000 (21:36 -0700)
src/osd/PG.cc
src/osd/PG.h

index 2eef533de12283fb7209aa0ab7772b71a950515d..d5bfc6cb80660e52e03ea5b2c400e47d41e5f0c2 100644 (file)
@@ -2005,7 +2005,7 @@ void PG::write_log(ObjectStore::Transaction& t)
 
   // assemble buffer
   bufferlist bl;
-  
+
   // build buffer
   ondisklog.tail = 0;
   ondisklog.block_map.clear();
@@ -2013,8 +2013,13 @@ void PG::write_log(ObjectStore::Transaction& t)
        p != log.log.end();
        p++) {
     uint64_t startoff = bl.length();
-    ::encode(*p, bl);
-    uint64_t endoff = bl.length();
+
+    bufferlist ebl(sizeof(*p)*2);
+    ::encode(*p, ebl);
+    __u32 crc = ebl.crc32c(0);
+    ::encode(ebl, bl);
+    ::encode(crc, bl);
+    uint64_t endoff = ebl.length();
     if (startoff / 4096 != endoff / 4096) {
       // we reached a new block. *p was the last entry with bytes in previous block
       ondisklog.block_map[startoff] = p->version;
@@ -2028,7 +2033,8 @@ void PG::write_log(ObjectStore::Transaction& t)
     */
   }
   ondisklog.head = bl.length();
-  
+  ondisklog.has_checksums = true;
+
   // write it
   t.remove(meta_coll, log_oid );
   t.write(meta_coll, log_oid , 0, bl.length(), bl);
@@ -2116,7 +2122,15 @@ void PG::add_log_entry(Log::Entry& e, bufferlist& log_bl)
 
   // log mutation
   log.add(e);
-  ::encode(e, log_bl);
+  if (ondisklog.has_checksums) {
+    bufferlist ebl(sizeof(e)*2);
+    ::encode(e, ebl);
+    __u32 crc = ebl.crc32c(0);
+    ::encode(ebl, log_bl);
+    ::encode(crc, log_bl);
+  } else {
+    ::encode(e, log_bl);
+  }
   dout(10) << "add_log_entry " << e << dendl;
 }
 
@@ -2175,7 +2189,25 @@ void PG::read_log(ObjectStore *store)
     bool reorder = false;
     while (!p.end()) {
       uint64_t pos = ondisklog.tail + p.get_off();
-      ::decode(e, p);
+      if (ondisklog.has_checksums) {
+       bufferlist ebl;
+       ::decode(ebl, p);
+       __u32 crc;
+       ::decode(crc, p);
+       
+       __u32 got = ebl.crc32c(0);
+       if (crc == got) {
+         bufferlist::iterator q = ebl.begin();
+         ::decode(e, q);
+       } else {
+         dout(0) << "read_log " << pos << " bad crc got " << got << " expected" << crc << dendl;
+
+         // ....
+         assert("do something smart here..." == 0);
+       }
+      } else {
+       ::decode(e, p);
+      }
       dout(20) << "read_log " << pos << " " << e << dendl;
 
       // [repair] in order?
index fb7e922995603cbc0f9db3e710d772a530e16b69..b6b92beb97b28f7627e3fc25bd25842e29a8f8eb 100644 (file)
@@ -484,6 +484,7 @@ public:
     // ok
     uint64_t tail;                     // first byte of log. 
     uint64_t head;                        // byte following end of log.
+    bool has_checksums;
     map<uint64_t,eversion_t> block_map;  // offset->version of _last_ entry with _any_ bytes in each block
 
     OndiskLog() : tail(0), head(0) {}
@@ -498,7 +499,7 @@ public:
     }
 
     void encode(bufferlist& bl) const {
-      __u8 struct_v = 1;
+      __u8 struct_v = 2;
       ::encode(struct_v, bl);
       ::encode(tail, bl);
       ::encode(head, bl);
@@ -506,6 +507,7 @@ public:
     void decode(bufferlist::iterator& bl) {
       __u8 struct_v;
       ::decode(struct_v, bl);
+      has_checksums = (struct_v >= 2);
       ::decode(tail, bl);
       ::decode(head, bl);
     }