if (olist.size() != info.stats.num_objects)
dout(10) << " WARNING: " << olist.size() << " != num_objects " << info.stats.num_objects << dendl;
-
+ /*
+ * note that we don't create prior_version backlog entries for
+ * objects that no longer exist. that's maybe a bit sloppy, but not
+ * a problem, since we mainly care about generating an accurate
+ * missing map, and an object that gets deleted will obviously not
+ * end up missing; in merge_log, we'll see the final remove entry,
+ * and missing.rm().
+ */
+
int local = 0;
map<eversion_t,Log::Entry> add;
for (vector<pobject_t>::iterator it = olist.begin();
local++;
pobject_t poid = pobject_t(info.pgid.pool(), 0, it->oid);
- if (log.logged_object(poid.oid)) continue; // already have it logged.
+ /*
+ * we can skip an object if
+ * - is already in the log AND
+ * - it is a totally new object OR
+ * - the prior_version is also already in the log
+ * otherwise, we need to include it.
+ */
+ eversion_t version;
+ if (log.objects.count(poid.oid)) {
+ // note the prior version
+ version = log.objects[poid.oid]->prior_version;
+ if (version == eversion_t() || // either new object, or
+ version >= log.bottom) // prior_version also already in log
+ continue; // already have it logged.
+
+ // ok, create backlog entry for _prior_ version!
+ } else {
+ osd->store->getattr(info.pgid.to_coll(), poid, "version",
+ &version, sizeof(version));
+
+ }
- // add entry
+ // add backlog entry.
Log::Entry e;
e.oid = poid.oid;
+ e.version = version;
if (poid.oid.snap && poid.oid.snap < CEPH_NOSNAP) {
e.op = Log::Entry::CLONE;
osd->store->getattr(info.pgid.to_coll(), poid, "snaps", e.snaps);
} else {
e.op = Log::Entry::MODIFY; // FIXME when we do smarter op codes!
}
- osd->store->getattr(info.pgid.to_coll(), poid, "version",
- &e.version, sizeof(e.version));
add[e.version] = e;
dout(10) << "generate_backlog found " << e << dendl;
}