last_peering_reset /* epoch to create pg at */);
// send some recent log, so that op dup detection works well.
- m->log.copy_up_to(pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
+ m->log.copy_up_to(cct, pg_log.get_log(), cct->_conf->osd_min_pg_log_entries);
m->info.log_tail = m->log.tail;
pi.log_tail = m->log.tail; // sigh...
get_osdmap_epoch(), info,
last_peering_reset /* epoch to create pg at */);
// send new stuff to append to replicas log
- m->log.copy_after(pg_log.get_log(), pi.last_update);
+ m->log.copy_after(cct, pg_log.get_log(), pi.last_update);
}
// share past_intervals if we are creating the pg on the replica
<< ", sending full log instead";
mlog->log = pg_log.get_log(); // primary should not have requested this!!
} else
- mlog->log.copy_after(pg_log.get_log(), query.since);
+ mlog->log.copy_after(cct, pg_log.get_log(), query.since);
}
else if (query.type == pg_query_t::FULLLOG) {
dout(10) << " sending info+missing+full log" << dendl;
o.back()->log.push_back(**p);
}
-void pg_log_t::copy_after(const pg_log_t &other, eversion_t v)
+static void _handle_dups(CephContext* cct, pg_log_t &target, const pg_log_t &other, unsigned maxdups)
+{
+ auto earliest_dup_version =
+ target.head.version < maxdups ? 0u : target.head.version - maxdups + 1;
+ lgeneric_subdout(cct, osd, 20) << "copy_up_to/copy_after earliest_dup_version " << earliest_dup_version << dendl;
+
+ for (auto d = other.dups.cbegin(); d != other.dups.cend(); ++d) {
+ if (d->version.version >= earliest_dup_version) {
+ lgeneric_subdout(cct, osd, 20)
+ << "copy_up_to/copy_after copy dup version "
+ << d->version << dendl;
+ target.dups.push_back(pg_log_dup_t(*d));
+ }
+ }
+
+ for (auto i = other.log.cbegin(); i != other.log.cend(); ++i) {
+ ceph_assert(i->version > other.tail);
+ if (i->version > target.tail)
+ break;
+ if (i->version.version >= earliest_dup_version) {
+ lgeneric_subdout(cct, osd, 20)
+ << "copy_up_to/copy_after copy dup from log version "
+ << i->version << dendl;
+ target.dups.push_back(pg_log_dup_t(*i));
+ }
+ }
+}
+
+
+void pg_log_t::copy_after(CephContext* cct, const pg_log_t &other, eversion_t v)
{
can_rollback_to = other.can_rollback_to;
head = other.head;
tail = other.tail;
+ lgeneric_subdout(cct, osd, 20) << __func__ << " v " << v << dendl;
for (list<pg_log_entry_t>::const_reverse_iterator i = other.log.rbegin();
i != other.log.rend();
++i) {
tail = i->version;
break;
}
+ lgeneric_subdout(cct, osd, 20) << __func__ << " copy log version " << i->version << dendl;
log.push_front(*i);
}
+ _handle_dups(cct, *this, other, cct->_conf->osd_pg_log_dups_tracked);
}
-void pg_log_t::copy_up_to(const pg_log_t &other, int max)
+void pg_log_t::copy_up_to(CephContext* cct, const pg_log_t &other, int max)
{
can_rollback_to = other.can_rollback_to;
int n = 0;
head = other.head;
tail = other.tail;
+ lgeneric_subdout(cct, osd, 20) << __func__ << " max " << max << dendl;
for (list<pg_log_entry_t>::const_reverse_iterator i = other.log.rbegin();
i != other.log.rend();
++i) {
+ ceph_assert(i->version > other.tail);
if (n++ >= max) {
tail = i->version;
break;
}
+ lgeneric_subdout(cct, osd, 20) << __func__ << " copy log version " << i->version << dendl;
log.push_front(*i);
}
+ _handle_dups(cct, *this, other, cct->_conf->osd_pg_log_dups_tracked);
}
ostream& pg_log_t::print(ostream& out) const
* @param other pg_log_t to copy from
* @param from copy entries after this version
*/
- void copy_after(const pg_log_t &other, eversion_t from);
+ void copy_after(CephContext* cct, const pg_log_t &other, eversion_t from);
/**
* copy up to N entries
* @param other source log
* @param max max number of entries to copy
*/
- void copy_up_to(const pg_log_t &other, int max);
+ void copy_up_to(CephContext* cct, const pg_log_t &other, int max);
ostream& print(ostream& out) const;