-void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s)
+void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s)
{
if (complete_to != log.end() &&
complete_to->version <= s) {
<< " on " << *this << dendl;
}
+ set<string> keys_to_rm;
while (!log.empty()) {
pg_log_entry_t &e = *log.begin();
if (e.version > s)
break;
generic_dout(20) << "trim " << e << dendl;
unindex(e); // remove from index,
+ keys_to_rm.insert(e.get_key_name());
log.pop_front(); // from log
}
+ t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm);
// raise tail?
if (tail < s)
void PG::write_log(ObjectStore::Transaction& t)
{
dout(10) << "write_log" << dendl;
-
- // assemble buffer
- bufferlist bl;
-
- // build buffer
- ondisklog.tail = 0;
+ t.remove(coll_t::META_COLL, log_oid);
+ t.touch(coll_t::META_COLL, log_oid);
+ map<string,bufferlist> keys;
for (list<pg_log_entry_t>::iterator p = log.log.begin();
p != log.log.end();
p++) {
- uint64_t startoff = bl.length();
-
- bufferlist ebl(sizeof(*p)*2);
- ::encode(*p, ebl);
- __u32 crc = ebl.crc32c(0);
- ::encode(ebl, bl);
- ::encode(crc, bl);
-
- p->offset = startoff;
+ bufferlist bl(sizeof(*p) * 2);
+ p->encode_with_checksum(bl);
+ keys[p->get_key_name()].claim(bl);
}
- ondisklog.head = bl.length();
- ondisklog.has_checksums = true;
+ dout(10) << "write_log " << keys.size() << " keys" << dendl;
- // write it
- t.remove(coll_t::META_COLL, log_oid );
- t.write(coll_t::META_COLL, log_oid , 0, bl.length(), bl);
+ ::encode(ondisklog.divergent_priors, keys["divergent_priors"]);
+
+ t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
- bufferlist blb(sizeof(ondisklog));
- ::encode(ondisklog, blb);
- t.collection_setattr(coll, "ondisklog", blb);
-
- dout(10) << "write_log to " << ondisklog.tail << "~" << ondisklog.length() << dendl;
dirty_log = false;
}
assert(trim_to <= info.last_complete);
dout(10) << "trim " << log << " to " << trim_to << dendl;
- log.trim(t, trim_to);
+ log.trim(t, log_oid, trim_to);
info.log_tail = log.tail;
- trim_ondisklog(t);
}
}
-void PG::trim_ondisklog(ObjectStore::Transaction& t)
-{
- uint64_t new_tail;
- if (log.empty()) {
- new_tail = ondisklog.head;
- } else {
- new_tail = log.log.front().offset;
- }
- bool same_block = (new_tail & ~4095) == (ondisklog.tail & ~4095);
- dout(15) << "trim_ondisklog tail " << ondisklog.tail << " -> " << new_tail
- << ", now " << new_tail << "~" << (ondisklog.head - new_tail)
- << " " << (same_block ? "(same block)" : "(different block)")
- << dendl;
- assert(new_tail >= ondisklog.tail);
-
- if (same_block)
- return;
-
- ondisklog.tail = new_tail;
-
- if (!g_conf->osd_preserve_trimmed_log) {
- uint64_t zt = new_tail & ~4095;
- if (zt > ondisklog.zero_to) {
- t.zero(coll_t::META_COLL, log_oid, ondisklog.zero_to, zt - ondisklog.zero_to);
- dout(15) << "trim_ondisklog zeroing from " << ondisklog.zero_to
- << " to " << zt << dendl;
- ondisklog.zero_to = zt;
- }
- }
-
- bufferlist blb(sizeof(ondisklog));
- ::encode(ondisklog, blb);
- t.collection_setattr(coll, "ondisklog", blb);
-}
-
void PG::trim_peers()
{
calc_trim_to();
// log mutation
log.add(e);
- if (ondisklog.has_checksums) {
- bufferlist ebl(sizeof(e)*2);
- ::encode(e, ebl);
- __u32 crc = ebl.crc32c(0);
- ::encode(ebl, log_bl);
- ::encode(crc, log_bl);
- } else {
- ::encode(e, log_bl);
- }
dout(10) << "add_log_entry " << e << dendl;
+
+ e.encode_with_checksum(log_bl);
}
{
dout(10) << "append_log " << log << " " << logv << dendl;
- bufferlist bl;
+ map<string,bufferlist> keys;
for (vector<pg_log_entry_t>::iterator p = logv.begin();
p != logv.end();
p++) {
- p->offset = ondisklog.head + bl.length();
- add_log_entry(*p, bl);
+ p->offset = 0;
+ add_log_entry(*p, keys[p->get_key_name()]);
}
- dout(10) << "append_log " << ondisklog.tail << "~" << ondisklog.length()
- << " adding " << bl.length() << dendl;
-
- t.write(coll_t::META_COLL, log_oid, ondisklog.head, bl.length(), bl );
- ondisklog.head += bl.length();
-
- bufferlist blb(sizeof(ondisklog));
- ::encode(ondisklog, blb);
- t.collection_setattr(coll, "ondisklog", blb);
- dout(10) << "append_log now " << ondisklog.tail << "~" << ondisklog.length() << dendl;
+ dout(10) << "append_log adding " << keys.size() << " keys" << dendl;
+ t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
trim(t, trim_to);
osd->infos_oid, snap_collections, info_struct_v);
assert(r >= 0);
- try {
- ostringstream oss;
- read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this);
- if (oss.str().length())
- osd->clog.error() << oss.str();
- }
- catch (const buffer::error &e) {
- string cr_log_coll_name(get_corrupt_pg_log_name());
- dout(0) << "Got exception '" << e.what() << "' while reading log. "
- << "Moving corrupted log file to '" << cr_log_coll_name
- << "' for later " << "analysis." << dendl;
-
- ondisklog.zero();
-
- // clear log index
- log.head = log.tail = info.last_update;
-
- // reset info
- info.log_tail = info.last_update;
-
- // Move the corrupt log to a new place and create a new zero-length log entry.
- ObjectStore::Transaction t;
- coll_t cr_log_coll(cr_log_coll_name);
- t.create_collection(cr_log_coll);
- t.collection_move(cr_log_coll, coll_t::META_COLL, log_oid);
- t.touch(coll_t::META_COLL, log_oid);
- dirty_info = true;
- write_if_dirty(t);
- store->apply_transaction(t);
-
- info.last_backfill = hobject_t();
- info.stats.stats.clear();
- }
+ ostringstream oss;
+ read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this);
+ if (oss.str().length())
+ osd->clog.error() << oss;
// log any weirdness
log_weirdness();
const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
{
- // load bounds
- ondisklog.tail = ondisklog.head = 0;
+ dout(10) << "read_log" << dendl;
- bufferlist blb;
- store->collection_getattr(coll, "ondisklog", blb);
- bufferlist::iterator p = blb.begin();
- ::decode(ondisklog, p);
+ // legacy?
+ struct stat st;
+ int r = store->stat(coll_t::META_COLL, log_oid, &st);
+ assert(r == 0);
+ if (st.st_size > 0) {
+ read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg);
+ return;
+ }
+
+ log.tail = info.log_tail;
+ ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid);
+ if (p) for (p->seek_to_first(); p->valid() ; p->next()) {
+ bufferlist bl = p->value();//Copy bufferlist before creating iterator
+ bufferlist::iterator bp = bl.begin();
+ if (p->key() == "divergent_priors") {
+ ::decode(ondisklog.divergent_priors, bp);
+ dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl;
+ } else {
+ pg_log_entry_t e;
+ e.decode_with_checksum(bp);
+ dout(20) << "read_log " << e << dendl;
+ log.log.push_back(e);
+ log.head = e.version;
+ }
+ }
+ log.head = info.last_update;
+ log.index();
- dout(10) << "read_log " << ondisklog.tail << "~" << ondisklog.length() << dendl;
+ // build missing
+ if (info.last_complete < info.last_update) {
+ dout(10) << "read_log checking for missing items over interval (" << info.last_complete
+ << "," << info.last_update << "]" << dendl;
+ set<hobject_t> did;
+ for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
+ i != log.log.rend();
+ i++) {
+ if (i->version <= info.last_complete) break;
+ if (did.count(i->soid)) continue;
+ did.insert(i->soid);
+
+ if (i->is_delete()) continue;
+
+ bufferlist bv;
+ int r = store->getattr(coll, i->soid, OI_ATTR, bv);
+ if (r >= 0) {
+ object_info_t oi(bv);
+ if (oi.version < i->version) {
+ dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl;
+ missing.add(i->soid, i->version, oi.version);
+ }
+ } else {
+ dout(15) << "read_log missing " << *i << dendl;
+ missing.add(i->soid, i->version, eversion_t());
+ }
+ }
+ for (map<eversion_t, hobject_t>::reverse_iterator i =
+ ondisklog.divergent_priors.rbegin();
+ i != ondisklog.divergent_priors.rend();
+ ++i) {
+ if (i->first <= info.last_complete) break;
+ if (did.count(i->second)) continue;
+ did.insert(i->second);
+ bufferlist bv;
+ int r = store->getattr(coll, i->second, OI_ATTR, bv);
+ if (r >= 0) {
+ object_info_t oi(bv);
+ /**
+ * 1) we see this entry in the divergent priors mapping
+ * 2) we didn't see an entry for this object in the log
+ *
+ * From 1 & 2 we know that either the object does not exist
+ * or it is at the version specified in the divergent_priors
+ * map since the object would have been deleted atomically
+ * with the addition of the divergent_priors entry, an older
+ * version would not have been recovered, and a newer version
+ * would show up in the log above.
+ */
+ assert(oi.version == i->first);
+ } else {
+ dout(15) << "read_log missing " << *i << dendl;
+ missing.add(i->second, i->first, eversion_t());
+ }
+ }
+ }
+ dout(10) << "read_log done" << dendl;
+}
+
+void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
+ const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+ pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
+{
+ // load bounds, based on old OndiskLog encoding.
+ uint64_t ondisklog_tail = 0;
+ uint64_t ondisklog_head = 0;
+ uint64_t ondisklog_zero_to;
+ bool ondisklog_has_checksums;
+
+ bufferlist blb;
+ store->collection_getattr(coll, "ondisklog", blb);
+ {
+ bufferlist::iterator bl = blb.begin();
+ DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+ ondisklog_has_checksums = (struct_v >= 2);
+ ::decode(ondisklog_tail, bl);
+ ::decode(ondisklog_head, bl);
+ if (struct_v >= 4)
+ ::decode(ondisklog_zero_to, bl);
+ else
+ ondisklog_zero_to = 0;
+ if (struct_v >= 5)
+ ::decode(ondisklog.divergent_priors, bl);
+ DECODE_FINISH(bl);
+ }
+ uint64_t ondisklog_length = ondisklog_head - ondisklog_tail;
+ dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl;
+
log.tail = info.log_tail;
// In case of sobject_t based encoding, may need to list objects in the store
bool listed_collection = false;
vector<hobject_t> ls;
- if (ondisklog.head > 0) {
+ if (ondisklog_head > 0) {
// read
bufferlist bl;
- store->read(coll_t::META_COLL, log_oid, ondisklog.tail, ondisklog.length(), bl);
- if (bl.length() < ondisklog.length()) {
+ store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl);
+ if (bl.length() < ondisklog_length) {
std::ostringstream oss;
oss << "read_log got " << bl.length() << " bytes, expected "
- << ondisklog.head << "-" << ondisklog.tail << "="
- << ondisklog.length();
+ << ondisklog_head << "-" << ondisklog_tail << "="
+ << ondisklog_length;
throw read_log_error(oss.str().c_str());
}
eversion_t last;
bool reorder = false;
while (!p.end()) {
- uint64_t pos = ondisklog.tail + p.get_off();
- if (ondisklog.has_checksums) {
+ uint64_t pos = ondisklog_tail + p.get_off();
+ if (ondisklog_has_checksums) {
bufferlist ebl;
::decode(ebl, p);
__u32 crc;
}
e.offset = pos;
- uint64_t endpos = ondisklog.tail + p.get_off();
+ uint64_t endpos = ondisklog_tail + p.get_off();
log.log.push_back(e);
last = e.version;
// [repair] at end of log?
if (!p.end() && e.version == info.last_update) {
oss << info.pgid << " log has extra data at "
- << endpos << "~" << (ondisklog.head-endpos) << " after "
- << info.last_update << "\n";
+ << endpos << "~" << (ondisklog_head-endpos) << " after "
+ << info.last_update << "\n";
dout(0) << "read_log " << endpos << " *** extra gunk at end of log, "
- << "adjusting ondisklog.head" << dendl;
- ondisklog.head = endpos;
+ << "adjusting ondisklog_head" << dendl;
+ ondisklog_head = endpos;
break;
}
}
log.log.push_back(p->second);
}
}
-
- log.head = info.last_update;
- log.index();
-
- // build missing
- if (info.last_complete < info.last_update) {
- dout(10) << "read_log checking for missing items over interval (" << info.last_complete
- << "," << info.last_update << "]" << dendl;
-
- set<hobject_t> did;
- for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
- i != log.log.rend();
- i++) {
- if (i->version <= info.last_complete) break;
- if (did.count(i->soid)) continue;
- did.insert(i->soid);
-
- if (i->is_delete()) continue;
-
- bufferlist bv;
- int r = store->getattr(coll, i->soid, OI_ATTR, bv);
- if (r >= 0) {
- object_info_t oi(bv);
- if (oi.version < i->version) {
- dout(15) << "read_log missing " << *i << " (have " << oi.version << ")" << dendl;
- missing.add(i->soid, i->version, oi.version);
- }
- } else {
- dout(15) << "read_log missing " << *i << dendl;
- missing.add(i->soid, i->version, eversion_t());
- }
- }
- for (map<eversion_t, hobject_t>::reverse_iterator i =
- ondisklog.divergent_priors.rbegin();
- i != ondisklog.divergent_priors.rend();
- ++i) {
- if (i->first <= info.last_complete) break;
- if (did.count(i->second)) continue;
- did.insert(i->second);
- bufferlist bv;
- int r = store->getattr(coll, i->second, OI_ATTR, bv);
- if (r >= 0) {
- object_info_t oi(bv);
- /**
- * 1) we see this entry in the divergent priors mapping
- * 2) we didn't see an entry for this object in the log
- *
- * From 1 & 2 we know that either the object does not exist
- * or it is at the version specified in the divergent_priors
- * map since the object would have been deleted atomically
- * with the addition of the divergent_priors entry, an older
- * version would not have been recovered, and a newer version
- * would show up in the log above.
- */
- assert(oi.version == i->first);
- } else {
- dout(15) << "read_log missing " << *i << dendl;
- missing.add(i->second, i->first, eversion_t());
- }
- }
- }
- dout(10) << "read_log done" << dendl;
}
/*------------ Recovery State Machine----------------*/