// assemble buffer
bufferlist bl;
-
+
// build buffer
ondisklog.tail = 0;
ondisklog.block_map.clear();
p != log.log.end();
p++) {
uint64_t startoff = bl.length();
- ::encode(*p, bl);
- uint64_t endoff = bl.length();
+
+ bufferlist ebl(sizeof(*p)*2);
+ ::encode(*p, ebl);
+ __u32 crc = ebl.crc32c(0);
+ ::encode(ebl, bl);
+ ::encode(crc, bl);
+ uint64_t endoff = ebl.length();
if (startoff / 4096 != endoff / 4096) {
// we reached a new block. *p was the last entry with bytes in previous block
ondisklog.block_map[startoff] = p->version;
*/
}
ondisklog.head = bl.length();
-
+ ondisklog.has_checksums = true;
+
// write it
t.remove(meta_coll, log_oid );
t.write(meta_coll, log_oid , 0, bl.length(), bl);
// log mutation
log.add(e);
- ::encode(e, log_bl);
+ if (ondisklog.has_checksums) {
+ bufferlist ebl(sizeof(e)*2);
+ ::encode(e, ebl);
+ __u32 crc = ebl.crc32c(0);
+ ::encode(ebl, log_bl);
+ ::encode(crc, log_bl);
+ } else {
+ ::encode(e, log_bl);
+ }
dout(10) << "add_log_entry " << e << dendl;
}
bool reorder = false;
while (!p.end()) {
uint64_t pos = ondisklog.tail + p.get_off();
- ::decode(e, p);
+ if (ondisklog.has_checksums) {
+ bufferlist ebl;
+ ::decode(ebl, p);
+ __u32 crc;
+ ::decode(crc, p);
+
+ __u32 got = ebl.crc32c(0);
+ if (crc == got) {
+ bufferlist::iterator q = ebl.begin();
+ ::decode(e, q);
+ } else {
+ dout(0) << "read_log " << pos << " bad crc got " << got << " expected" << crc << dendl;
+
+ // ....
+ assert("do something smart here..." == 0);
+ }
+ } else {
+ ::decode(e, p);
+ }
dout(20) << "read_log " << pos << " " << e << dendl;
// [repair] in order?
// ok
uint64_t tail; // first byte of log.
uint64_t head; // byte following end of log.
+ bool has_checksums;
map<uint64_t,eversion_t> block_map; // offset->version of _last_ entry with _any_ bytes in each block
OndiskLog() : tail(0), head(0) {}
}
void encode(bufferlist& bl) const {
- __u8 struct_v = 1;
+ __u8 struct_v = 2;
::encode(struct_v, bl);
::encode(tail, bl);
::encode(head, bl);
void decode(bufferlist::iterator& bl) {
__u8 struct_v;
::decode(struct_v, bl);
+ has_checksums = (struct_v >= 2);
::decode(tail, bl);
::decode(head, bl);
}