From: David Zafman Date: Fri, 8 Feb 2013 22:43:50 +0000 (-0800) Subject: osd: move pg log into leveldb X-Git-Tag: v0.58~53^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1ef94200e9bce5e0f0ac5d1e563421a9d036c203;p=ceph.git osd: move pg log into leveldb log from wip-pginfo Fix bugs in PG::read_log() handling Eliminate compiler warnings Feature #4075: osd: move pg log into leveldb Signed-off-by: Sage Weil Signed-off-by: David Zafman --- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index dd32a3e38ca0..ee1df9fc253f 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -142,7 +142,7 @@ std::string PG::gen_prefix() const -void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s) +void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s) { if (complete_to != log.end() && complete_to->version <= s) { @@ -150,14 +150,17 @@ void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s) << " on " << *this << dendl; } + set keys_to_rm; while (!log.empty()) { pg_log_entry_t &e = *log.begin(); if (e.version > s) break; generic_dout(20) << "trim " << e << dendl; unindex(e); // remove from index, + keys_to_rm.insert(e.get_key_name()); log.pop_front(); // from log } + t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm); // raise tail? if (tail < s) @@ -2398,37 +2401,22 @@ epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid void PG::write_log(ObjectStore::Transaction& t) { dout(10) << "write_log" << dendl; - - // assemble buffer - bufferlist bl; - - // build buffer - ondisklog.tail = 0; + t.remove(coll_t::META_COLL, log_oid); + t.touch(coll_t::META_COLL, log_oid); + map keys; for (list::iterator p = log.log.begin(); p != log.log.end(); p++) { - uint64_t startoff = bl.length(); - - bufferlist ebl(sizeof(*p)*2); - ::encode(*p, ebl); - __u32 crc = ebl.crc32c(0); - ::encode(ebl, bl); - ::encode(crc, bl); - - p->offset = startoff; + bufferlist bl(sizeof(*p) * 2); + p->encode_with_checksum(bl); + keys[p->get_key_name()].claim(bl); } - ondisklog.head = bl.length(); - ondisklog.has_checksums = true; + dout(10) << "write_log " << keys.size() << " keys" << dendl; - // write it - t.remove(coll_t::META_COLL, log_oid ); - t.write(coll_t::META_COLL, log_oid , 0, bl.length(), bl); + ::encode(ondisklog.divergent_priors, keys["divergent_priors"]); + + t.omap_setkeys(coll_t::META_COLL, log_oid, keys); - bufferlist blb(sizeof(ondisklog)); - ::encode(ondisklog, blb); - t.collection_setattr(coll, "ondisklog", blb); - - dout(10) << "write_log to " << ondisklog.tail << "~" << ondisklog.length() << dendl; dirty_log = false; } @@ -2452,47 +2440,11 @@ void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to) assert(trim_to <= info.last_complete); dout(10) << "trim " << log << " to " << trim_to << dendl; - log.trim(t, trim_to); + log.trim(t, log_oid, trim_to); info.log_tail = log.tail; - trim_ondisklog(t); } } -void PG::trim_ondisklog(ObjectStore::Transaction& t) -{ - uint64_t new_tail; - if (log.empty()) { - new_tail = ondisklog.head; - } else { - new_tail = log.log.front().offset; - } - bool same_block = (new_tail & ~4095) == (ondisklog.tail & ~4095); - dout(15) << "trim_ondisklog tail " << ondisklog.tail << " -> " << new_tail - << ", now " << new_tail << "~" << (ondisklog.head - new_tail) - << " " << (same_block ? "(same block)" : "(different block)") - << dendl; - assert(new_tail >= ondisklog.tail); - - if (same_block) - return; - - ondisklog.tail = new_tail; - - if (!g_conf->osd_preserve_trimmed_log) { - uint64_t zt = new_tail & ~4095; - if (zt > ondisklog.zero_to) { - t.zero(coll_t::META_COLL, log_oid, ondisklog.zero_to, zt - ondisklog.zero_to); - dout(15) << "trim_ondisklog zeroing from " << ondisklog.zero_to - << " to " << zt << dendl; - ondisklog.zero_to = zt; - } - } - - bufferlist blb(sizeof(ondisklog)); - ::encode(ondisklog, blb); - t.collection_setattr(coll, "ondisklog", blb); -} - void PG::trim_peers() { calc_trim_to(); @@ -2518,16 +2470,9 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl) // log mutation log.add(e); - if (ondisklog.has_checksums) { - bufferlist ebl(sizeof(e)*2); - ::encode(e, ebl); - __u32 crc = ebl.crc32c(0); - ::encode(ebl, log_bl); - ::encode(crc, log_bl); - } else { - ::encode(e, log_bl); - } dout(10) << "add_log_entry " << e << dendl; + + e.encode_with_checksum(log_bl); } @@ -2536,24 +2481,16 @@ void PG::append_log( { dout(10) << "append_log " << log << " " << logv << dendl; - bufferlist bl; + map keys; for (vector::iterator p = logv.begin(); p != logv.end(); p++) { - p->offset = ondisklog.head + bl.length(); - add_log_entry(*p, bl); + p->offset = 0; + add_log_entry(*p, keys[p->get_key_name()]); } - dout(10) << "append_log " << ondisklog.tail << "~" << ondisklog.length() - << " adding " << bl.length() << dendl; - - t.write(coll_t::META_COLL, log_oid, ondisklog.head, bl.length(), bl ); - ondisklog.head += bl.length(); - - bufferlist blb(sizeof(ondisklog)); - ::encode(ondisklog, blb); - t.collection_setattr(coll, "ondisklog", blb); - dout(10) << "append_log now " << ondisklog.tail << "~" << ondisklog.length() << dendl; + dout(10) << "append_log adding " << keys.size() << " keys" << dendl; + t.omap_setkeys(coll_t::META_COLL, log_oid, keys); trim(t, trim_to); @@ -2718,39 +2655,10 @@ void PG::read_state(ObjectStore *store, bufferlist &bl) osd->infos_oid, snap_collections, info_struct_v); assert(r >= 0); - try { - ostringstream oss; - read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this); - if (oss.str().length()) - osd->clog.error() << oss.str(); - } - catch (const buffer::error &e) { - string cr_log_coll_name(get_corrupt_pg_log_name()); - dout(0) << "Got exception '" << e.what() << "' while reading log. " - << "Moving corrupted log file to '" << cr_log_coll_name - << "' for later " << "analysis." << dendl; - - ondisklog.zero(); - - // clear log index - log.head = log.tail = info.last_update; - - // reset info - info.log_tail = info.last_update; - - // Move the corrupt log to a new place and create a new zero-length log entry. - ObjectStore::Transaction t; - coll_t cr_log_coll(cr_log_coll_name); - t.create_collection(cr_log_coll); - t.collection_move(cr_log_coll, coll_t::META_COLL, log_oid); - t.touch(coll_t::META_COLL, log_oid); - dirty_info = true; - write_if_dirty(t); - store->apply_transaction(t); - - info.last_backfill = hobject_t(); - info.stats.stats.clear(); - } + ostringstream oss; + read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this); + if (oss.str().length()) + osd->clog.error() << oss; // log any weirdness log_weirdness(); @@ -5139,16 +5047,125 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, pg_missing_t &missing, ostringstream &oss, const PG *passedpg) { - // load bounds - ondisklog.tail = ondisklog.head = 0; + dout(10) << "read_log" << dendl; - bufferlist blb; - store->collection_getattr(coll, "ondisklog", blb); - bufferlist::iterator p = blb.begin(); - ::decode(ondisklog, p); + // legacy? + struct stat st; + int r = store->stat(coll_t::META_COLL, log_oid, &st); + assert(r == 0); + if (st.st_size > 0) { + read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg); + return; + } + + log.tail = info.log_tail; + ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid); + if (p) for (p->seek_to_first(); p->valid() ; p->next()) { + bufferlist bl = p->value();//Copy bufferlist before creating iterator + bufferlist::iterator bp = bl.begin(); + if (p->key() == "divergent_priors") { + ::decode(ondisklog.divergent_priors, bp); + dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl; + } else { + pg_log_entry_t e; + e.decode_with_checksum(bp); + dout(20) << "read_log " << e << dendl; + log.log.push_back(e); + log.head = e.version; + } + } + log.head = info.last_update; + log.index(); - dout(10) << "read_log " << ondisklog.tail << "~" << ondisklog.length() << dendl; + // build missing + if (info.last_complete < info.last_update) { + dout(10) << "read_log checking for missing items over interval (" << info.last_complete + << "," << info.last_update << "]" << dendl; + set did; + for (list::reverse_iterator i = log.log.rbegin(); + i != log.log.rend(); + i++) { + if (i->version <= info.last_complete) break; + if (did.count(i->soid)) continue; + did.insert(i->soid); + + if (i->is_delete()) continue; + + bufferlist bv; + int r = store->getattr(coll, i->soid, OI_ATTR, bv); + if (r >= 0) { + object_info_t oi(bv); + if (oi.version < i->version) { + dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl; + missing.add(i->soid, i->version, oi.version); + } + } else { + dout(15) << "read_log missing " << *i << dendl; + missing.add(i->soid, i->version, eversion_t()); + } + } + for (map::reverse_iterator i = + ondisklog.divergent_priors.rbegin(); + i != ondisklog.divergent_priors.rend(); + ++i) { + if (i->first <= info.last_complete) break; + if (did.count(i->second)) continue; + did.insert(i->second); + bufferlist bv; + int r = store->getattr(coll, i->second, OI_ATTR, bv); + if (r >= 0) { + object_info_t oi(bv); + /** + * 1) we see this entry in the divergent priors mapping + * 2) we didn't see an entry for this object in the log + * + * From 1 & 2 we know that either the object does not exist + * or it is at the version specified in the divergent_priors + * map since the object would have been deleted atomically + * with the addition of the divergent_priors entry, an older + * version would not have been recovered, and a newer version + * would show up in the log above. + */ + assert(oi.version == i->first); + } else { + dout(15) << "read_log missing " << *i << dendl; + missing.add(i->second, i->first, eversion_t()); + } + } + } + dout(10) << "read_log done" << dendl; +} + +void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss, const PG *passedpg) +{ + // load bounds, based on old OndiskLog encoding. + uint64_t ondisklog_tail = 0; + uint64_t ondisklog_head = 0; + uint64_t ondisklog_zero_to; + bool ondisklog_has_checksums; + + bufferlist blb; + store->collection_getattr(coll, "ondisklog", blb); + { + bufferlist::iterator bl = blb.begin(); + DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); + ondisklog_has_checksums = (struct_v >= 2); + ::decode(ondisklog_tail, bl); + ::decode(ondisklog_head, bl); + if (struct_v >= 4) + ::decode(ondisklog_zero_to, bl); + else + ondisklog_zero_to = 0; + if (struct_v >= 5) + ::decode(ondisklog.divergent_priors, bl); + DECODE_FINISH(bl); + } + uint64_t ondisklog_length = ondisklog_head - ondisklog_tail; + dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl; + log.tail = info.log_tail; // In case of sobject_t based encoding, may need to list objects in the store @@ -5156,15 +5173,15 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, bool listed_collection = false; vector ls; - if (ondisklog.head > 0) { + if (ondisklog_head > 0) { // read bufferlist bl; - store->read(coll_t::META_COLL, log_oid, ondisklog.tail, ondisklog.length(), bl); - if (bl.length() < ondisklog.length()) { + store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl); + if (bl.length() < ondisklog_length) { std::ostringstream oss; oss << "read_log got " << bl.length() << " bytes, expected " - << ondisklog.head << "-" << ondisklog.tail << "=" - << ondisklog.length(); + << ondisklog_head << "-" << ondisklog_tail << "=" + << ondisklog_length; throw read_log_error(oss.str().c_str()); } @@ -5174,8 +5191,8 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, eversion_t last; bool reorder = false; while (!p.end()) { - uint64_t pos = ondisklog.tail + p.get_off(); - if (ondisklog.has_checksums) { + uint64_t pos = ondisklog_tail + p.get_off(); + if (ondisklog_has_checksums) { bufferlist ebl; ::decode(ebl, p); __u32 crc; @@ -5243,19 +5260,19 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, } e.offset = pos; - uint64_t endpos = ondisklog.tail + p.get_off(); + uint64_t endpos = ondisklog_tail + p.get_off(); log.log.push_back(e); last = e.version; // [repair] at end of log? if (!p.end() && e.version == info.last_update) { oss << info.pgid << " log has extra data at " - << endpos << "~" << (ondisklog.head-endpos) << " after " - << info.last_update << "\n"; + << endpos << "~" << (ondisklog_head-endpos) << " after " + << info.last_update << "\n"; dout(0) << "read_log " << endpos << " *** extra gunk at end of log, " - << "adjusting ondisklog.head" << dendl; - ondisklog.head = endpos; + << "adjusting ondisklog_head" << dendl; + ondisklog_head = endpos; break; } } @@ -5270,68 +5287,6 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, log.log.push_back(p->second); } } - - log.head = info.last_update; - log.index(); - - // build missing - if (info.last_complete < info.last_update) { - dout(10) << "read_log checking for missing items over interval (" << info.last_complete - << "," << info.last_update << "]" << dendl; - - set did; - for (list::reverse_iterator i = log.log.rbegin(); - i != log.log.rend(); - i++) { - if (i->version <= info.last_complete) break; - if (did.count(i->soid)) continue; - did.insert(i->soid); - - if (i->is_delete()) continue; - - bufferlist bv; - int r = store->getattr(coll, i->soid, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - if (oi.version < i->version) { - dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl; - missing.add(i->soid, i->version, oi.version); - } - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->soid, i->version, eversion_t()); - } - } - for (map::reverse_iterator i = - ondisklog.divergent_priors.rbegin(); - i != ondisklog.divergent_priors.rend(); - ++i) { - if (i->first <= info.last_complete) break; - if (did.count(i->second)) continue; - did.insert(i->second); - bufferlist bv; - int r = store->getattr(coll, i->second, OI_ATTR, bv); - if (r >= 0) { - object_info_t oi(bv); - /** - * 1) we see this entry in the divergent priors mapping - * 2) we didn't see an entry for this object in the log - * - * From 1 & 2 we know that either the object does not exist - * or it is at the version specified in the divergent_priors - * map since the object would have been deleted atomically - * with the addition of the divergent_priors entry, an older - * version would not have been recovered, and a newer version - * would show up in the log above. - */ - assert(oi.version == i->first); - } else { - dout(15) << "read_log missing " << *i << dendl; - missing.add(i->second, i->first, eversion_t()); - } - } - } - dout(10) << "read_log done" << dendl; } /*------------ Recovery State Machine----------------*/ diff --git a/src/osd/PG.h b/src/osd/PG.h index a60de9a5f201..26c536889715 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -267,7 +267,7 @@ public: caller_ops[e.reqid] = &(log.back()); } - void trim(ObjectStore::Transaction &t, eversion_t s); + void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s); ostream& print(ostream& out) const; }; @@ -334,6 +334,16 @@ public: f->dump_unsigned("head", head); f->dump_unsigned("tail", tail); f->dump_unsigned("zero_to", zero_to); + f->open_array_section("divergent_priors"); + for (map::const_iterator p = divergent_priors.begin(); + p != divergent_priors.end(); + ++p) { + f->open_object_section("prior"); + f->dump_stream("version") << p->first; + f->dump_stream("object") << p->second; + f->close_section(); + } + f->close_section(); } static void generate_test_instances(list& o) { o.push_back(new OndiskLog); @@ -1791,9 +1801,11 @@ public: static void read_log(ObjectStore *store, coll_t coll, hobject_t log_oid, const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL); + static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid, + const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log, + pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL); bool check_log_for_corruption(ObjectStore *store); void trim(ObjectStore::Transaction& t, eversion_t v); - void trim_ondisklog(ObjectStore::Transaction& t); void trim_peers(); std::string get_corrupt_pg_log_name() const; diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index d6faa273c47f..8ef0c9b58a14 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -1710,6 +1710,34 @@ void pg_query_t::generate_test_instances(list& o) // -- pg_log_entry_t -- +string pg_log_entry_t::get_key_name() const +{ + char key[40]; + snprintf(key, sizeof(key), "%010u.%020lu", version.epoch, version.version); + return string(key); +} + +void pg_log_entry_t::encode_with_checksum(bufferlist& bl) const +{ + bufferlist ebl(sizeof(*this)*2); + encode(ebl); + __u32 crc = ebl.crc32c(0); + ::encode(ebl, bl); + ::encode(crc, bl); +} + +void pg_log_entry_t::decode_with_checksum(bufferlist::iterator& p) +{ + bufferlist bl; + ::decode(bl, p); + __u32 crc; + ::decode(crc, p); + if (crc != bl.crc32c(0)) + throw buffer::malformed_input("bad checksum on pg_log_entry_t"); + bufferlist::iterator q = bl.begin(); + decode(q); +} + void pg_log_entry_t::encode(bufferlist &bl) const { ENCODE_START(7, 4, bl); diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 66818bc07570..1d793ec515b3 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1318,6 +1318,10 @@ struct pg_log_entry_t { return reqid != osd_reqid_t() && (op == MODIFY || op == DELETE); } + string get_key_name() const; + void encode_with_checksum(bufferlist& bl) const; + void decode_with_checksum(bufferlist::iterator& p); + void encode(bufferlist &bl) const; void decode(bufferlist::iterator &bl); void dump(Formatter *f) const;